From e4ce717ebb9e0b6660c1a98d790102b3616b8997 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 25 Jun 2019 10:28:14 -0400 Subject: [PATCH] storage: batch command application and coalesce applied state per batch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit batches raft command application where possible. The basic approach is to notice that many commands only "trivially" update the replica state machine. Trivial commands can be processed in a single batch by acting on a copy of the replica state. Non-trivial commands share the same logic but always commit alone as they for one reason or another rely on having a view of the replica or storage engine as of a specific log index. This commit also sneaks in another optimization which batching enables. Each command mutates a portion of replica state called the applied state which tracks a combination of the log index which has been applied and the MVCC stats of the range as of that application. Before this change each entry would update this applied state and each of those writes will end up in the WAL and mem-table just the be compacted away in L1. Now that commands are being applied to the storage engine in a single batch it is easy to only update the applied state for the last entry in the batch. For sequential writes this patch shows a considerable performance win. The below benchmark was run on a 3-node c5d.4xlarge (16 vCPU) cluster with concurrency 128. ``` name old ops/s new ops/s delta KV0-throughput 22.1k ± 1% 32.8k ± 1% +48.59% (p=0.029 n=4+4) name old ms/s new ms/s delta KV0-P50 7.15 ± 2% 6.00 ± 0% -16.08% (p=0.029 n=4+4) KV0-Avg 5.80 ± 0% 3.80 ± 0% -34.48% (p=0.029 n=4+4) ``` Due to the re-organization of logic in the change, the Replica.mu does not need to be acquired as many times during the application of a batch. In the common case it is now acquired exactly twice in the process of applying a batch whereas before it was acquired more than twice per entry. This should hopefully improve performance on large machines which experience mutex contention for a single range. This effect is visible on large machines. Below are results from running a normal KV0 workload on c5d.18xlarge machines (72 vCPU machines) with concurrency 1024 and 16 initial splits. ``` name old ops/s new ops/s delta KV0-throughput 78.1k ± 1% 116.8k ± 5% +49.42% (p=0.029 n=4+4) name old ms/s new ms/s delta KV0-P50 24.4 ± 3% 19.7 ± 7% -19.28% (p=0.029 n=4+4) KV0-Avg 12.6 ± 0% 7.5 ± 9% -40.87% (p=0.029 n=4+4) ``` Fixes #37426. Release note (performance improvement): Batch raft entry application and coalesce writes to applied state for the batch. --- pkg/storage/cmd_app_batch.go | 131 ++++ pkg/storage/cmd_app_ctx.go | 134 ++++ pkg/storage/cmd_app_ctx_buf.go | 116 +++ pkg/storage/cmd_app_ctx_buf_test.go | 54 ++ pkg/storage/replica_application.go | 848 ++++++++++++++++++++++ pkg/storage/replica_application_result.go | 430 +++++++++++ pkg/storage/replica_proposal.go | 267 ------- pkg/storage/replica_raft.go | 837 +-------------------- pkg/storage/replica_test.go | 3 +- pkg/storage/store.go | 4 +- pkg/storage/track_raft_protos.go | 4 +- 11 files changed, 1752 insertions(+), 1076 deletions(-) create mode 100644 pkg/storage/cmd_app_batch.go create mode 100644 pkg/storage/cmd_app_ctx.go create mode 100644 pkg/storage/cmd_app_ctx_buf.go create mode 100644 pkg/storage/cmd_app_ctx_buf_test.go create mode 100644 pkg/storage/replica_application.go create mode 100644 pkg/storage/replica_application_result.go diff --git a/pkg/storage/cmd_app_batch.go b/pkg/storage/cmd_app_batch.go new file mode 100644 index 000000000000..72c423fca530 --- /dev/null +++ b/pkg/storage/cmd_app_batch.go @@ -0,0 +1,131 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "sync" + + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "go.etcd.io/etcd/raft/raftpb" +) + +// cmdAppBatch accumulates state due to the application of raft +// commands. Committed raft commands are applied to the batch in a multi-stage +// process whereby individual commands are decoded, prepared for application +// relative to the current view of replicaState, committed to the storage +// engine, applied to the Replica's in-memory state and then finished by +// releasing their latches and notifying clients. +type cmdAppBatch struct { + // decodeBuf is used to decode an entry before adding it to the batch. + // See decode(). + decodeBuf decodedRaftEntry + + // batch accumulates writes implied by the raft entries in this batch. + batch engine.Batch + + // replicaState is this batch's view of the replica's state. + // It is copied from under the Replica.mu when the batch is initialized and + // is updated in stageTrivialReplicatedEvalResult. + replicaState storagepb.ReplicaState + + // stats is stored on the application batch to avoid an allocation in tracking + // the batch's view of replicaState. All pointer fields in replicaState other + // than Stats are overwritten completely rather than updated in-place. + stats enginepb.MVCCStats + + // updatedTruncatedState tracks whether any command in the batch updated the + // replica's truncated state. Truncated state updates are considered trivial + // but represent something of a special case but given their relative + // frequency and the fact that multiple updates can be trivially coalesced, we + // treat updates to truncated state as trivial. If the batch updated the + // truncated state then after it has been committed, then the side-loaded data + // and raftentry.Cache should be truncated to the index indicated. + // TODO(ajwerner): consider whether this logic should imply that commands + // which update truncated state are non-trivial. + updatedTruncatedState bool + + cmdBuf cmdAppCtxBuf +} + +// cmdAppBatch structs are needed to apply raft commands, which is to +// say, frequently, so best to pool them rather than allocated under the raftMu. +var cmdAppBatchSyncPool = sync.Pool{ + New: func() interface{} { + return new(cmdAppBatch) + }, +} + +func getCmdAppBatch() *cmdAppBatch { + return cmdAppBatchSyncPool.Get().(*cmdAppBatch) +} + +func releaseCmdAppBatch(b *cmdAppBatch) { + b.cmdBuf.clear() + *b = cmdAppBatch{} + cmdAppBatchSyncPool.Put(b) +} + +// add adds adds the entry and its decoded state to the end of the batch. +func (b *cmdAppBatch) add(e *raftpb.Entry, d decodedRaftEntry) { + s := b.cmdBuf.allocate() + s.decodedRaftEntry = d + s.e = e +} + +// decode decodes commands from toProcess until either all of the commands have +// been added to the batch or a non-trivial command is decoded. Non-trivial +// commands must always be in their own batch. If a non-trivial command is +// encountered the batch is returned immediately without adding the newly +// decoded command to the batch or removing it from remaining. +// It is the client's responsibility to deal with non-trivial commands. +// +// numEmptyEntries indicates the number of entries in the consumed portion of +// toProcess contained a zero-byte payload. +func (b *cmdAppBatch) decode( + ctx context.Context, toProcess []raftpb.Entry, decodeBuf *decodedRaftEntry, +) ( + foundNonTrivialEntry bool, + numEmptyEntries int, + remaining []raftpb.Entry, + errExpl string, + err error, +) { + for len(toProcess) > 0 { + e := &toProcess[0] + if len(e.Data) == 0 { + numEmptyEntries++ + } + if errExpl, err = decodeBuf.decode(ctx, e); err != nil { + return false, numEmptyEntries, nil, errExpl, err + } + // This is a non-trivial entry which needs to be processed alone. + foundNonTrivialEntry = !isTrivial(decodeBuf.replicatedResult(), + b.replicaState.UsingAppliedStateKey) + if foundNonTrivialEntry { + break + } + // We're going to process this entry in this batch so pop it from toProcess + // and add to appStates. + toProcess = toProcess[1:] + b.add(e, *decodeBuf) + } + return foundNonTrivialEntry, numEmptyEntries, toProcess, "", nil +} + +func (b *cmdAppBatch) reset() { + b.cmdBuf.clear() + *b = cmdAppBatch{ + decodeBuf: b.decodeBuf, // preserve the previously decoded entry + } +} diff --git a/pkg/storage/cmd_app_ctx.go b/pkg/storage/cmd_app_ctx.go new file mode 100644 index 000000000000..3b80f2697a14 --- /dev/null +++ b/pkg/storage/cmd_app_ctx.go @@ -0,0 +1,134 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft/raftpb" +) + +// cmdAppCtx stores the state required to apply a single raft +// entry to a replica. The state is accumulated in stages which occur in +// Replica.handleCommittedEntriesRaftMuLocked. From a high level, a command is +// decoded into an entryApplicationBatch, then if it was proposed locally the +// proposal is populated from the replica's proposals map, then the command +// is staged into the batch by writing its update to the batch's engine.Batch +// and applying its "trivial" side-effects to the batch's view of ReplicaState. +// Then the batch is committed, the side-effects are applied and the local +// result is processed. +type cmdAppCtx struct { + // e is the Entry being applied. + e *raftpb.Entry + decodedRaftEntry // decoded from e. + + // proposal is populated on the proposing Replica only and comes from the + // Replica's proposal map. + proposal *ProposalData + // ctx will be the proposal's context if proposed locally, otherwise it will + // be populated with the handleCommittedEntries ctx. + ctx context.Context + + // The below fields are set during stageRaftCommand when we validate that + // a command applies given the current lease in checkForcedErr. + leaseIndex uint64 + forcedErr *roachpb.Error + proposalRetry proposalReevaluationReason + mutationCount int // number of mutations in the WriteBatch, for writeStats + // splitMergeUnlock is acquired for splits and merges. + splitMergeUnlock func(*storagepb.ReplicatedEvalResult) + + // The below fields are set after the data has been written to the storage + // engine in prepareLocalResult. + localResult *result.LocalResult + response proposalResult +} + +func (cmd *cmdAppCtx) proposedLocally() bool { + return cmd.proposal != nil +} + +// decodedRaftEntry represents the deserialized content of a raftpb.Entry. +type decodedRaftEntry struct { + idKey storagebase.CmdIDKey + raftCmd storagepb.RaftCommand + *decodedConfChange // only non-nil for config changes +} + +// decodedConfChange represents the fields of a config change raft command. +type decodedConfChange struct { + cc raftpb.ConfChange + ccCtx ConfChangeContext +} + +func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { + return &d.raftCmd.ReplicatedEvalResult +} + +// decode decodes the entry e into the decodedRaftEntry. +func (d *decodedRaftEntry) decode( + ctx context.Context, e *raftpb.Entry, +) (errExpl string, err error) { + *d = decodedRaftEntry{} + // etcd raft sometimes inserts nil commands, ours are never nil. + // This case is handled upstream of this call. + if len(e.Data) == 0 { + return "", nil + } + switch e.Type { + case raftpb.EntryNormal: + return d.decodeNormalEntry(e) + case raftpb.EntryConfChange: + return d.decodeConfChangeEntry(e) + default: + log.Fatalf(ctx, "unexpected Raft entry: %v", e) + return "", nil // unreachable + } +} + +func (d *decodedRaftEntry) decodeNormalEntry(e *raftpb.Entry) (errExpl string, err error) { + var encodedCommand []byte + d.idKey, encodedCommand = DecodeRaftCommand(e.Data) + // An empty command is used to unquiesce a range and wake the + // leader. Clear commandID so it's ignored for processing. + if len(encodedCommand) == 0 { + d.idKey = "" + } else if err := protoutil.Unmarshal(encodedCommand, &d.raftCmd); err != nil { + const errExpl = "while unmarshalling entry" + return errExpl, errors.Wrap(err, errExpl) + } + return "", nil +} + +func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) (errExpl string, err error) { + d.decodedConfChange = &decodedConfChange{} + if err := protoutil.Unmarshal(e.Data, &d.cc); err != nil { + const errExpl = "while unmarshaling ConfChange" + return errExpl, errors.Wrap(err, errExpl) + } + if err := protoutil.Unmarshal(d.cc.Context, &d.ccCtx); err != nil { + const errExpl = "while unmarshaling ConfChangeContext" + return errExpl, errors.Wrap(err, errExpl) + } + if err := protoutil.Unmarshal(d.ccCtx.Payload, &d.raftCmd); err != nil { + const errExpl = "while unmarshaling RaftCommand" + return errExpl, errors.Wrap(err, errExpl) + } + d.idKey = storagebase.CmdIDKey(d.ccCtx.CommandID) + return "", nil +} diff --git a/pkg/storage/cmd_app_ctx_buf.go b/pkg/storage/cmd_app_ctx_buf.go new file mode 100644 index 000000000000..c16640da4f5e --- /dev/null +++ b/pkg/storage/cmd_app_ctx_buf.go @@ -0,0 +1,116 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import "sync" + +// cmdAppCtxBufNodeSize is the size of the arrays in an +// cmdAppStateBufNode. +// TODO(ajwerner): justify this number. +const cmdAppCtxBufNodeSize = 8 + +// cmdAppCtxBuf is an allocation-efficient buffer used during the +// application of raft entries. Initialization occurs lazily upon the first +// call to allocate but used cmdAppCtxBuf objects should be released +// explicitly with the clear() method to release the allocated buffers back +// to the pool. +type cmdAppCtxBuf struct { + len int32 + head, tail *cmdAppCtxBufNode +} + +// cmdAppCtxBufNode is a linked-list element in an +// cmdAppStateBuf. +type cmdAppCtxBufNode struct { + len int32 + buf [cmdAppCtxBufNodeSize]cmdAppCtx + next *cmdAppCtxBufNode +} + +var cmdAppStateBufNodeSyncPool = sync.Pool{ + New: func() interface{} { return new(cmdAppCtxBufNode) }, +} + +// allocate extends the length of buf by one and returns the newly +// added element. If this is the first call to allocate it will initialize buf. +// After a buf is initialized it should be explicitly destroyed. +func (buf *cmdAppCtxBuf) allocate() *cmdAppCtx { + if buf.tail == nil { // lazy initialization + n := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) + buf.head, buf.tail = n, n + } + if buf.tail.len == cmdAppCtxBufNodeSize { + newTail := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) + buf.tail.next = newTail + buf.tail = newTail + } + ret := &buf.tail.buf[buf.tail.len] + buf.tail.len++ + buf.len++ + return ret +} + +// truncate clears all of the entries currently in a buffer and returns any +// allocated buffers to the pool. +func (buf *cmdAppCtxBuf) clear() { + for buf.head != nil { + buf.len -= buf.head.len + oldHead := buf.head + newHead := oldHead.next + buf.head = newHead + *oldHead = cmdAppCtxBufNode{} + cmdAppStateBufNodeSyncPool.Put(oldHead) + } + *buf = cmdAppCtxBuf{} +} + +// last returns a pointer to the last element in the buffer. +func (buf *cmdAppCtxBuf) last() *cmdAppCtx { + return &buf.tail.buf[buf.tail.len-1] +} + +// cmdAppCtxBufIterator iterates through the entries in an +// cmdAppStateBuf. +type cmdAppCtxBufIterator struct { + idx int32 + buf *cmdAppCtxBuf + node *cmdAppCtxBufNode +} + +// init seeks the iterator to the front of buf. It returns true if buf is not +// empty and false if it is. +func (it *cmdAppCtxBufIterator) init(buf *cmdAppCtxBuf) (ok bool) { + *it = cmdAppCtxBufIterator{buf: buf, node: buf.head} + return it.buf.len > 0 +} + +// cur returns the cmdAppState currently pointed to by it. +func (it *cmdAppCtxBufIterator) cur() *cmdAppCtx { + return &it.node.buf[it.idx%cmdAppCtxBufNodeSize] +} + +// isLast returns true if it currently points to the last element in the buffer. +func (it *cmdAppCtxBufIterator) isLast() bool { + return it.idx+1 == it.buf.len +} + +// next moves it to point to the next element. It returns false if it.isLast() +// is true, indicating that there are no more elements in the buffer. +func (it *cmdAppCtxBufIterator) next() bool { + if it.isLast() { + return false + } + it.idx++ + if it.idx%cmdAppCtxBufNodeSize == 0 { + it.node = it.node.next + } + return true +} diff --git a/pkg/storage/cmd_app_ctx_buf_test.go b/pkg/storage/cmd_app_ctx_buf_test.go new file mode 100644 index 000000000000..99979f35271c --- /dev/null +++ b/pkg/storage/cmd_app_ctx_buf_test.go @@ -0,0 +1,54 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/assert" +) + +// TestEntryApplicationStateBuf verifies the entryApplicationStateBuf behavior. +func TestApplicationStateBuf(t *testing.T) { + defer leaktest.AfterTest(t)() + var buf cmdAppCtxBuf + // numStates is chosen arbitrarily. + const numStates = 5*cmdAppCtxBufNodeSize + 1 + // Test that the len field is properly updated. + var states []*cmdAppCtx + for i := 0; i < numStates; i++ { + assert.Equal(t, i, int(buf.len)) + states = append(states, buf.allocate()) + assert.Equal(t, i+1, int(buf.len)) + } + // Test that last returns the correct value. + last := states[len(states)-1] + assert.Equal(t, last, buf.last()) + // Test the iterator. + var it cmdAppCtxBufIterator + i := 0 + for ok := it.init(&buf); ok; ok = it.next() { + assert.Equal(t, states[i], it.cur()) + i++ + } + assert.Equal(t, i, numStates) // make sure we saw them all + assert.True(t, it.isLast()) + // Test clear. + buf.clear() + assert.EqualValues(t, buf, cmdAppCtxBuf{}) + assert.Equal(t, 0, int(buf.len)) + assert.Panics(t, func() { buf.last() }) + assert.False(t, it.init(&buf)) + // Test clear on an empty buffer. + buf.clear() + assert.EqualValues(t, buf, cmdAppCtxBuf{}) +} diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go new file mode 100644 index 000000000000..a02b104c1100 --- /dev/null +++ b/pkg/storage/replica_application.go @@ -0,0 +1,848 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/stateloader" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/kr/pretty" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft/raftpb" +) + +// handleCommittedEntriesStats returns stats about what happened during the +// application of a set of raft entries. +// +// TODO(ajwerner): add metrics to go with these stats. +type handleCommittedEntriesStats struct { + batchesProcessed int + entriesProcessed int + stateAssertions int + numEmptyEntries int +} + +// handleCommittedEntriesRaftMuLocked deals with the complexities involved in +// moving the Replica's replicated state machine forward given committed raft +// entries. All changes to r.mu.state occur downstream of this call. +// +// The stats return value reflects the number of entries which are processed, +// the number of batches used to process them, the number of entries which +// contained no data (added by etcd raft), and the number of entries which +// required asserting the on-disk state. +// +// Errors returned from this method are fatal! The errExpl is provided as a +// non-sensitive cue of where the error occurred. +// TODO(ajwerner): replace errExpl with proper error composition using the new +// errors library. +// +// At a high level, this method receives committed entries which each contains +// the evaluation of a batch (at its heart a WriteBatch, to be applied to the +// underlying storage engine), which it decodes into batches of entries which +// are safe to apply atomically together. +// +// The pseudocode looks like: +// +// while entries: +// for each entry in entries: +// decode front into temp struct +// if entry is non-trivial and batch is not empty: +// break +// add decoded command to batch +// pop front from entries +// if entry is non-trivial: +// break +// prepare local commands +// for each entry in batch: +// check if failed +// if not: +// stage in batch +// commit batch to storage engine +// update Replica.state +// for each entry in batch: +// apply side-effects, ack client, release latches +// +// The processing of committed entries proceeds in 4 stages: decoding, +// local preparation, staging, and application. Commands may be applied together +// so long as their implied state change is "trivial" (see isTrivial). Once +// decoding has discovered a batch boundary, the commands are prepared by +// reading the current replica state from underneath the Replica.mu and +// determining whether any of the commands were proposed locally. Next each +// command is written to the engine.Batch and has the "trivial" component of +// its ReplicatedEvalResult applied to the batch's view of the ReplicaState. +// Finally the batch is written to the storage engine, its side effects on +// the Replica's state are applied, and the clients acked with the respective +// results. +func (r *Replica) handleCommittedEntriesRaftMuLocked( + ctx context.Context, committedEntries []raftpb.Entry, +) (stats handleCommittedEntriesStats, errExpl string, err error) { + var ( + haveNonTrivialEntry bool + toProcess = committedEntries + b = getCmdAppBatch() + ) + defer releaseCmdAppBatch(b) + for len(toProcess) > 0 { + if haveNonTrivialEntry { + // If the previous call to b.decode() informed us that it left a + // non-trivial entry in decodeBuf, use it. + haveNonTrivialEntry = false + b.add(&toProcess[0], b.decodeBuf) + toProcess = toProcess[1:] + } else { + // Decode zero or more trivial entries into b. + var numEmptyEntries int + haveNonTrivialEntry, numEmptyEntries, toProcess, errExpl, err = + b.decode(ctx, toProcess, &b.decodeBuf) + if err != nil { + return stats, errExpl, err + } + // If no trivial entries were decoded go back around and process the + // non-trivial entry. + if b.cmdBuf.len == 0 { + continue + } + stats.numEmptyEntries += numEmptyEntries + } + r.retrieveLocalProposals(ctx, b) + b.batch = r.store.engine.NewBatch() + // Stage each of the commands which will write them into the newly created + // engine.Batch and update b's view of the replicaState. + var it cmdAppCtxBufIterator + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + r.stageRaftCommand(cmd.ctx, cmd, b.batch, &b.replicaState, + it.isLast() /* writeAppliedState */) + // We permit trivial commands to update the truncated state but we need to + // track whether that happens so that the side-effects of truncation occur + // after the batch is committed to storage. + updatedTruncatedState := stageTrivialReplicatedEvalResult(cmd.ctx, + cmd.replicatedResult(), cmd.e.Index, cmd.leaseIndex, &b.replicaState) + b.updatedTruncatedState = b.updatedTruncatedState || updatedTruncatedState + } + if errExpl, err = r.applyCmdAppBatch(ctx, b, &stats); err != nil { + return stats, errExpl, err + } + } + return stats, "", nil +} + +// retrieveLocalProposals populates the proposal and ctx fields for each of the +// commands in b. +func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { + r.mu.Lock() + defer r.mu.Unlock() + b.replicaState = r.mu.state + // Copy stats as it gets updated in-place in applyRaftCommandToBatch. + b.replicaState.Stats = &b.stats + *b.replicaState.Stats = *r.mu.state.Stats + var it cmdAppCtxBufIterator + haveProposalQuota := r.mu.proposalQuota != nil + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + cmd.proposal = r.mu.proposals[cmd.idKey] + if cmd.proposedLocally() { + // We initiated this command, so use the caller-supplied context. + cmd.ctx = cmd.proposal.ctx + delete(r.mu.proposals, cmd.idKey) + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if haveProposalQuota { + r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, cmd.proposal.quotaSize) + } + } else { + cmd.ctx = ctx + } + } +} + +// stageRaftCommand handles the first phase of applying a command to the +// replica state machine. +// +// The proposal also contains auxiliary data which needs to be verified in order +// to decide whether the proposal should be applied: the command's MaxLeaseIndex +// must move the state machine's LeaseAppliedIndex forward, and the proposer's +// lease (or rather its sequence number) must match that of the state machine, +// and lastly the GCThreshold is validated. If any of the checks fail, the +// proposal's content is wiped and we apply an empty log entry instead. If an +// error occurs and the command was proposed locally, the error will be +// communicated to the waiting proposer. The two typical cases in which errors +// occur are lease mismatch (in which case the caller tries to send the command +// to the actual leaseholder) and violation of the LeaseAppliedIndex (in which +// case the proposal is retried if it was proposed locally). +// +// Assuming all checks were passed, the command is applied to the batch, +// which is done by the aptly named applyRaftCommandToBatch. +// +// For trivial proposals this is the whole story, but some commands trigger +// additional code in this method in this method via a side effect (in the +// proposal's ReplicatedEvalResult or, for local proposals, +// LocalEvalResult). These might, for example, trigger an update of the +// Replica's in-memory state to match updates to the on-disk state, or pass +// intents to the intent resolver. Some commands don't fit this simple schema +// and need to hook deeper into the code. Notably splits and merges need to +// acquire locks on their right-hand side Replicas and may need to add data to +// the WriteBatch before it is applied; similarly, changes to the disk layout of +// internal state typically require a migration which shows up here. Any of this +// logic however is deferred until after the batch has been written to the +// storage engine. +func (r *Replica) stageRaftCommand( + ctx context.Context, + cmd *cmdAppCtx, + batch engine.Batch, + replicaState *storagepb.ReplicaState, + writeAppliedState bool, +) { + if cmd.e.Index == 0 { + log.Fatalf(ctx, "processRaftCommand requires a non-zero index") + } + if log.V(4) { + log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", + cmd.idKey, cmd.raftCmd.MaxLeaseIndex) + } + + var ts hlc.Timestamp + if cmd.idKey != "" { + ts = cmd.replicatedResult().Timestamp + } + + cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr(ctx, + cmd.idKey, cmd.raftCmd, cmd.proposal, cmd.proposedLocally(), replicaState) + if cmd.forcedErr == nil { + // Verify that the batch timestamp is after the GC threshold. This is + // necessary because not all commands declare read access on the GC + // threshold key, even though they implicitly depend on it. This means + // that access to this state will not be serialized by latching, + // so we must perform this check upstream and downstream of raft. + // See #14833. + // + // We provide an empty key span because we already know that the Raft + // command is allowed to apply within its key range. This is guaranteed + // by checks upstream of Raft, which perform the same validation, and by + // span latches, which assure that any modifications to the range's + // boundaries will be serialized with this command. Finally, the + // leaseAppliedIndex check in checkForcedErrLocked ensures that replays + // outside of the spanlatch manager's control which break this + // serialization ordering will already by caught and an error will be + // thrown. + cmd.forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) + } + + // applyRaftCommandToBatch will return "expected" errors, but may also indicate + // replica corruption (as of now, signaled by a replicaCorruptionError). + // We feed its return through maybeSetCorrupt to act when that happens. + if cmd.forcedErr != nil { + log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr) + } else { + log.Event(ctx, "applying command") + + if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { + log.Eventf(ctx, "unable to acquire split lock: %s", err) + // Send a crash report because a former bug in the error handling might have + // been the root cause of #19172. + _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { + log.SendCrashReport( + ctx, + &r.store.cfg.Settings.SV, + 0, // depth + "while acquiring split lock: %s", + []interface{}{err}, + log.ReportTypeError, + ) + }) + + cmd.forcedErr = roachpb.NewError(err) + } else if splitMergeUnlock != nil { + // Set the splitMergeUnlock on the cmdAppCtx to be called after the batch + // has been applied (see applyBatch). + cmd.splitMergeUnlock = splitMergeUnlock + } + } + + if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { + var newPropRetry int + newPropRetry, cmd.forcedErr = filter(storagebase.ApplyFilterArgs{ + CmdID: cmd.idKey, + ReplicatedEvalResult: *cmd.replicatedResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if cmd.proposalRetry == 0 { + cmd.proposalRetry = proposalReevaluationReason(newPropRetry) + } + } + + if cmd.forcedErr != nil { + // Apply an empty entry. + *cmd.replicatedResult() = storagepb.ReplicatedEvalResult{} + cmd.raftCmd.WriteBatch = nil + cmd.raftCmd.LogicalOpLog = nil + } + + // Update the node clock with the serviced request. This maintains + // a high water mark for all ops serviced, so that received ops without + // a timestamp specified are guaranteed one higher than any op already + // executed for overlapping keys. + // TODO(ajwerner): coalesce the clock update per batch. + r.store.Clock().Update(ts) + + { + err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState) + + // applyRaftCommandToBatch returned an error, which usually indicates + // either a serious logic bug in CockroachDB or a disk + // corruption/out-of-space issue. Make sure that these fail with + // descriptive message so that we can differentiate the root causes. + if err != nil { + log.Errorf(ctx, "unable to update the state machine: %+v", err) + // Report the fatal error separately and only with the error, as that + // triggers an optimization for which we directly report the error to + // sentry (which in turn allows sentry to distinguish different error + // types). + log.Fatal(ctx, err) + } + } + + if deprecatedDelta := cmd.replicatedResult().DeprecatedDelta; deprecatedDelta != nil { + cmd.replicatedResult().Delta = deprecatedDelta.ToStatsDelta() + cmd.replicatedResult().DeprecatedDelta = nil + } + + // AddSSTable ingestions run before the actual batch gets written to the + // storage engine. This makes sure that when the Raft command is applied, + // the ingestion has definitely succeeded. Note that we have taken + // precautions during command evaluation to avoid having mutations in the + // WriteBatch that affect the SSTable. Not doing so could result in order + // reversal (and missing values) here. + // + // NB: any command which has an AddSSTable is non-trivial and will be + // applied in its own batch so it's not possible that any other commands + // which precede this command can shadow writes from this SSTable. + if cmd.replicatedResult().AddSSTable != nil { + copied := addSSTablePreApply( + ctx, + r.store.cfg.Settings, + r.store.engine, + r.raftMu.sideloaded, + cmd.e.Term, + cmd.e.Index, + *cmd.replicatedResult().AddSSTable, + r.store.limiters.BulkIOWriteRate, + ) + r.store.metrics.AddSSTableApplications.Inc(1) + if copied { + r.store.metrics.AddSSTableApplicationCopies.Inc(1) + } + cmd.replicatedResult().AddSSTable = nil + } + + if cmd.replicatedResult().Split != nil { + // Splits require a new HardState to be written to the new RHS + // range (and this needs to be atomic with the main batch). This + // cannot be constructed at evaluation time because it differs + // on each replica (votes may have already been cast on the + // uninitialized replica). Write this new hardstate to the batch too. + // See https://github.com/cockroachdb/cockroach/issues/20629 + splitPreApply(ctx, batch, cmd.replicatedResult().Split.SplitTrigger) + } + + if merge := cmd.replicatedResult().Merge; merge != nil { + // Merges require the subsumed range to be atomically deleted when the + // merge transaction commits. + rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + const destroyData = false + err = rhsRepl.preDestroyRaftMuLocked(ctx, batch, batch, merge.RightDesc.NextReplicaID, destroyData) + if err != nil { + log.Fatal(ctx, err) + } + } +} + +func checkForcedErr( + ctx context.Context, + idKey storagebase.CmdIDKey, + raftCmd storagepb.RaftCommand, + proposal *ProposalData, + proposedLocally bool, + replicaState *storagepb.ReplicaState, +) (uint64, proposalReevaluationReason, *roachpb.Error) { + leaseIndex := replicaState.LeaseAppliedIndex + isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest + var requestedLease roachpb.Lease + if isLeaseRequest { + requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease + } + if idKey == "" { + // This is an empty Raft command (which is sent by Raft after elections + // to trigger reproposals or during concurrent configuration changes). + // Nothing to do here except making sure that the corresponding batch + // (which is bogus) doesn't get executed (for it is empty and so + // properties like key range are undefined). + return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") + } + + // Verify the lease matches the proposer's expectation. We rely on + // the proposer's determination of whether the existing lease is + // held, and can be used, or is expired, and can be replaced. + // Verify checks that the lease has not been modified since proposal + // due to Raft delays / reorderings. + // To understand why this lease verification is necessary, see comments on the + // proposer_lease field in the proto. + leaseMismatch := false + if raftCmd.DeprecatedProposerLease != nil { + // VersionLeaseSequence must not have been active when this was proposed. + // + // This does not prevent the lease race condition described below. The + // reason we don't fix this here as well is because fixing the race + // requires a new cluster version which implies that we'll already be + // using lease sequence numbers and will fall into the case below. + leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*replicaState.Lease) + } else { + leaseMismatch = raftCmd.ProposerLeaseSequence != replicaState.Lease.Sequence + if !leaseMismatch && isLeaseRequest { + // Lease sequence numbers are a reflection of lease equivalency + // between subsequent leases. However, Lease.Equivalent is not fully + // symmetric, meaning that two leases may be Equivalent to a third + // lease but not Equivalent to each other. If these leases are + // proposed under that same third lease, neither will be able to + // detect whether the other has applied just by looking at the + // current lease sequence number because neither will will increment + // the sequence number. + // + // This can lead to inversions in lease expiration timestamps if + // we're not careful. To avoid this, if a lease request's proposer + // lease sequence matches the current lease sequence and the current + // lease sequence also matches the requested lease sequence, we make + // sure the requested lease is Equivalent to current lease. + if replicaState.Lease.Sequence == requestedLease.Sequence { + // It is only possible for this to fail when expiration-based + // lease extensions are proposed concurrently. + leaseMismatch = !replicaState.Lease.Equivalent(requestedLease) + } + + // This is a check to see if the lease we proposed this lease request against is the same + // lease that we're trying to update. We need to check proposal timestamps because + // extensions don't increment sequence numbers. Without this check a lease could + // be extended and then another lease proposed against the original lease would + // be applied over the extension. + if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && + (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *replicaState.Lease.ProposedTS) { + leaseMismatch = true + } + } + } + if leaseMismatch { + log.VEventf( + ctx, 1, + "command proposed from replica %+v with lease #%d incompatible to %v", + raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *replicaState.Lease, + ) + if isLeaseRequest { + // For lease requests we return a special error that + // redirectOnOrAcquireLease() understands. Note that these + // requests don't go through the DistSender. + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "proposed under invalid lease", + }) + } + // We return a NotLeaseHolderError so that the DistSender retries. + nlhe := newNotLeaseHolderError( + replicaState.Lease, raftCmd.ProposerReplica.StoreID, replicaState.Desc) + nlhe.CustomMsg = fmt.Sprintf( + "stale proposal: command was proposed under lease #%d but is being applied "+ + "under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease) + return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) + } + + if isLeaseRequest { + // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This + // makes sense since lease commands are proposed by anyone, so we can't expect a coherent + // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the + // counter makes sense from a testing perspective. + // + // However, leases get special vetting to make sure we don't give one to a replica that was + // since removed (see #15385 and a comment in redirectOnOrAcquireLease). + if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "replica not part of range", + }) + } + } else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { + // The happy case: the command is applying at or ahead of the minimal + // permissible index. It's ok if it skips a few slots (as can happen + // during rearrangement); this command will apply, but later ones which + // were proposed at lower indexes may not. Overall though, this is more + // stable and simpler than requiring commands to apply at their exact + // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 + // is otherwise tricky since we can't tell the client to try again + // (reproposals could exist and may apply at the right index, leading + // to a replay), and assigning the required index would be tedious + // seeing that it would have to rewind sometimes. + leaseIndex = raftCmd.MaxLeaseIndex + } else { + // The command is trying to apply at a past log position. That's + // unfortunate and hopefully rare; the client on the proposer will try + // again. Note that in this situation, the leaseIndex does not advance. + retry := proposalNoReevaluation + if proposedLocally { + log.VEventf( + ctx, 1, + "retry proposal %x: applied at lease index %d, required < %d", + proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, + ) + retry = proposalIllegalLeaseIndex + } + return leaseIndex, retry, roachpb.NewErrorf( + "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, + ) + } + return leaseIndex, proposalNoReevaluation, nil +} + +// applyRaftCommandToBatch applies a raft command from the replicated log to the +// current batch's view of the underlying state machine. When the state machine +// cannot be updated, an error (which is fatal!) is returned and must be treated +// that way by the caller. +func (r *Replica) applyRaftCommandToBatch( + ctx context.Context, + cmd *cmdAppCtx, + replicaState *storagepb.ReplicaState, + batch engine.Batch, + writeAppliedState bool, +) error { + writeBatch := cmd.raftCmd.WriteBatch + if writeBatch != nil && len(writeBatch.Data) > 0 { + mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) + if err != nil { + log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err) + } else { + cmd.mutationCount = mutationCount + } + } + + // Exploit the fact that a split will result in a full stats + // recomputation to reset the ContainsEstimates flag. + // + // TODO(tschottdorf): We want to let the usual MVCCStats-delta + // machinery update our stats for the left-hand side. But there is no + // way to pass up an MVCCStats object that will clear out the + // ContainsEstimates flag. We should introduce one, but the migration + // makes this worth a separate effort (ContainsEstimates would need to + // have three possible values, 'UNCHANGED', 'NO', and 'YES'). + // Until then, we're left with this rather crude hack. + if cmd.replicatedResult().Split != nil { + replicaState.Stats.ContainsEstimates = false + } + ms := replicaState.Stats + + if cmd.e.Index != replicaState.RaftAppliedIndex+1 { + // If we have an out of order index, there's corruption. No sense in + // trying to update anything or running the command. Simply return + // a corruption error. + return errors.Errorf("applied index jumped from %d to %d", + replicaState.RaftAppliedIndex, cmd.e.Index) + } + + if writeBatch != nil { + if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { + return errors.Wrap(err, "unable to apply WriteBatch") + } + } + + // The only remaining use of the batch is for range-local keys which we know + // have not been previously written within this batch. + // + // TODO(ajwerner): explore the costs and benefits of this use of Distinct(). + // This call requires flushing the batch's writes but should make subsequent + // writes somewhat cheaper. Early exploration showed no win but found that if + // a Distinct() batch could be used for all of command application it could + // be a win. At the time of writing that approach was deemed not safe. + writer := batch.Distinct() + + // Special-cased MVCC stats handling to exploit commutativity of stats delta + // upgrades. Thanks to commutativity, the spanlatch manager does not have to + // serialize on the stats key. + deltaStats := cmd.replicatedResult().Delta.ToStats() + usingAppliedStateKey := replicaState.UsingAppliedStateKey + needAppliedStateMigration := !usingAppliedStateKey && + cmd.replicatedResult().State != nil && + cmd.replicatedResult().State.UsingAppliedStateKey + if needAppliedStateMigration { + // The Raft command wants us to begin using the RangeAppliedState key + // and we haven't performed the migration yet. Delete the old keys + // that this new key is replacing. + err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) + if err != nil { + return errors.Wrap(err, "unable to migrate to range applied state") + } + usingAppliedStateKey = true + } + + if !writeAppliedState { + // Don't write any applied state, regardless of the technique we'd be using. + } else if usingAppliedStateKey { + // Note that calling ms.Add will never result in ms.LastUpdateNanos + // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos + // across all deltaStats). + ms.Add(deltaStats) + + // Set the range applied state, which includes the last applied raft and + // lease index along with the mvcc stats, all in one key. + if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, + cmd.e.Index, cmd.leaseIndex, ms); err != nil { + return errors.Wrap(err, "unable to set range applied state") + } + ms.Subtract(deltaStats) + } else { + // Advance the last applied index. We use a blind write in order to avoid + // reading the previous applied index keys on every write operation. This + // requires a little additional work in order maintain the MVCC stats. + var appliedIndexNewMS enginepb.MVCCStats + if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, + cmd.e.Index, cmd.leaseIndex); err != nil { + return errors.Wrap(err, "unable to set applied index") + } + deltaStats.SysBytes += appliedIndexNewMS.SysBytes - + r.raftMu.stateLoader.CalcAppliedIndexSysBytes(replicaState.RaftAppliedIndex, replicaState.LeaseAppliedIndex) + + // Note that calling ms.Add will never result in ms.LastUpdateNanos + // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos + // across all deltaStats). + ms.Add(deltaStats) + if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, ms); err != nil { + return errors.Wrap(err, "unable to update MVCCStats") + } + ms.Subtract(deltaStats) + } + + // Close the Distinct() batch here now that we're done writing to it. + writer.Close() + + // NB: it is not sane to use the distinct engine when updating truncated state + // as multiple commands in the same batch could modify that state and the + // below code reads before it writes. + haveTruncatedState := cmd.replicatedResult().State != nil && + cmd.replicatedResult().State.TruncatedState != nil + if haveTruncatedState { + apply, err := handleTruncatedStateBelowRaft(ctx, replicaState.TruncatedState, + cmd.replicatedResult().State.TruncatedState, r.raftMu.stateLoader, batch) + if err != nil { + return err + } + if !apply { + // The truncated state was discarded, so make sure we don't apply + // it to our in-memory state. + cmd.replicatedResult().State.TruncatedState = nil + cmd.replicatedResult().RaftLogDelta = 0 + // TODO(ajwerner): consider moving this code. + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + r.mu.Lock() + r.mu.raftLogSizeTrusted = false + r.mu.Unlock() + } + } + + start := timeutil.Now() + + var assertHS *raftpb.HardState + if util.RaceEnabled && cmd.replicatedResult().Split != nil { + rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID) + oldHS, err := rsl.LoadHardState(ctx, batch) + if err != nil { + return errors.Wrap(err, "unable to load HardState") + } + assertHS = &oldHS + } + + if assertHS != nil { + // Load the HardState that was just committed (if any). + rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID) + newHS, err := rsl.LoadHardState(ctx, batch) + if err != nil { + return errors.Wrap(err, "unable to load HardState") + } + // Assert that nothing moved "backwards". + if newHS.Term < assertHS.Term || + (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { + log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", + pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) + } + } + + // TODO(ajwerner): This metric no longer has anything to do with the "Commit" + // of anything. Unfortunately it's exposed in the admin ui of 19.1. We should + // remove it from the admin ui for 19.2 and then in 20.1 we can rename it to + // something more appropriate. + elapsed := timeutil.Since(start) + r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) + cmd.replicatedResult().Delta = deltaStats.ToStatsDelta() + return nil +} + +// applyCmdAppBatch handles the logic of writing a batch to the storage engine and +// applying it to the Replica state machine. This method clears b for reuse +// before returning. +func (r *Replica) applyCmdAppBatch( + ctx context.Context, b *cmdAppBatch, stats *handleCommittedEntriesStats, +) (errExpl string, err error) { + defer b.reset() + if log.V(4) { + log.Infof(ctx, "flushing batch %v of %d entries", b.replicaState, b.cmdBuf.len) + } + // Entry application is not done without syncing to disk. + // The atomicity guarantees of the batch and the fact that the applied state + // is stored in this batch, ensure that if the batch ends up not being durably + // committed then the entries in this batch will be applied again upon + // startup. + if err := b.batch.Commit(false); err != nil { + log.Fatalf(ctx, "failed to commit Raft entry batch: %v", err) + } + b.batch.Close() + b.batch = nil + // NB: we compute the triviality of the batch here again rather than storing + // it as it may have changed during processing. In particular, the upgrade of + // applied state will have already occurred. + var batchIsNonTrivial bool + // Non-trivial entries are always alone in a batch and thus are last. + if cmd := b.cmdBuf.last(); !isTrivial(cmd.replicatedResult(), b.replicaState.UsingAppliedStateKey) { + batchIsNonTrivial = true + // Deal with locking sometimes associated with complex commands. + if unlock := cmd.splitMergeUnlock; unlock != nil { + defer unlock(cmd.replicatedResult()) + cmd.splitMergeUnlock = nil + } + if cmd.replicatedResult().BlockReads { + r.readOnlyCmdMu.Lock() + defer r.readOnlyCmdMu.Unlock() + cmd.replicatedResult().BlockReads = false + } + } + // Now that the batch is committed we can go about applying the side effects + // of the update to the truncated state. Note that this is safe only if the + // new truncated state is durably on disk (i.e.) synced. See #38566. + // + // NB: even if multiple updates to TruncatedState occurred in this batch we + // coalesce their side effects and only consult the latest + // TruncatedState.Index. + var sideloadTruncationDelta int64 + if b.updatedTruncatedState { + truncState := b.replicaState.TruncatedState + r.store.raftEntryCache.Clear(r.RangeID, truncState.Index+1) + log.VEventf(ctx, 1, "truncating sideloaded storage up to (and including) index %d", truncState.Index) + if sideloadTruncationDelta, _, err = r.raftMu.sideloaded.TruncateTo(ctx, truncState.Index+1); err != nil { + // We don't *have* to remove these entries for correctness. Log a + // loud error, but keep humming along. + log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) + } + } + + // Iterate through the cmds to compute the raft log delta and writeStats. + var mutationCount int + var it cmdAppCtxBufIterator + raftLogDelta := -sideloadTruncationDelta + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + raftLogDelta += cmd.replicatedResult().RaftLogDelta + cmd.replicatedResult().RaftLogDelta = 0 + mutationCount += cmd.mutationCount + } + // Record the write activity, passing a 0 nodeID because replica.writeStats + // intentionally doesn't track the origin of the writes. + r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) + // deltaStats will store the delta from the current state to the new state + // which will be used to update the metrics. + r.mu.Lock() + r.mu.state.RaftAppliedIndex = b.replicaState.RaftAppliedIndex + r.mu.state.LeaseAppliedIndex = b.replicaState.LeaseAppliedIndex + prevStats := *r.mu.state.Stats + *r.mu.state.Stats = *b.replicaState.Stats + if raftLogDelta != 0 { + r.mu.raftLogSize += raftLogDelta + if r.mu.raftLogSize < 0 { + r.mu.raftLogSize = 0 + } + r.mu.raftLogLastCheckSize += raftLogDelta + if r.mu.raftLogLastCheckSize < 0 { + r.mu.raftLogLastCheckSize = 0 + } + } + if b.updatedTruncatedState { + r.mu.state.TruncatedState = b.replicaState.TruncatedState + } + // Check the queuing conditions. + checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize + needsSplitBySize := r.needsSplitBySizeRLocked() + needsMergeBySize := r.needsMergeBySizeRLocked() + r.mu.Unlock() + deltaStats := *b.replicaState.Stats + deltaStats.Subtract(prevStats) + r.store.metrics.addMVCCStats(deltaStats) + // NB: the bootstrap store has a nil split queue. + // TODO(tbg): the above is probably a lie now. + if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { + r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + // The bootstrap store has a nil merge queue. + // TODO(tbg): the above is probably a lie now. + if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { + // TODO(tbg): for ranges which are small but protected from merges by + // other means (zone configs etc), this is called on every command, and + // fires off a goroutine each time. Make this trigger (and potentially + // the split one above, though it hasn't been observed to be as + // bothersome) less aggressive. + r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + if checkRaftLog { + r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + for ok := it.init(&b.cmdBuf); ok; ok = it.next() { + cmd := it.cur() + for _, sc := range cmd.replicatedResult().SuggestedCompactions { + r.store.compactor.Suggest(cmd.ctx, sc) + } + cmd.replicatedResult().SuggestedCompactions = nil + isNonTrivial := batchIsNonTrivial && it.isLast() + if errExpl, err = r.handleRaftCommandResult(ctx, cmd, isNonTrivial, + b.replicaState.UsingAppliedStateKey); err != nil { + return errExpl, err + } + if isNonTrivial { + stats.stateAssertions++ + } + stats.entriesProcessed++ + } + stats.batchesProcessed++ + return "", nil +} diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go new file mode 100644 index 000000000000..869a8e04d8bc --- /dev/null +++ b/pkg/storage/replica_application_result.go @@ -0,0 +1,430 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/kr/pretty" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +// isTrivial determines whether the side-effects of a ReplicatedEvalResult are +// "trivial". A result is fundamentally considered "trivial" if it does not have +// side effects which rely on the written state of the replica exactly matching +// the in-memory state of the replica at the corresponding log position. +// Non-trivial commands must be the last entry in a batch so that after +// batch is applied the replica's written and in-memory state correspond to that +// log index. +// +// At the time of writing it is possible that the current conditions are too +// strict but they are certainly sufficient. +func isTrivial(r *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool) (ret bool) { + // Check if there are any non-trivial State updates. + if r.State != nil { + stateWhitelist := *r.State + // An entry is non-trivial if an upgrade to UsingAppliedState is + // required. If we're already usingAppliedStateKey or this entry does + // not imply an upgrade then it is trivial. + if usingAppliedStateKey { + stateWhitelist.UsingAppliedStateKey = false + } + if stateWhitelist.TruncatedState != nil { + stateWhitelist.TruncatedState = nil + } + if stateWhitelist != (storagepb.ReplicaState{}) { + return false + } + } + // Set whitelist to the value of r and clear the whitelisted fields. + // If whitelist is zero-valued after clearing the whitelisted fields then + // it is trivial. + whitelist := *r + whitelist.Delta = enginepb.MVCCStatsDelta{} + whitelist.Timestamp = hlc.Timestamp{} + whitelist.DeprecatedDelta = nil + whitelist.PrevLeaseProposal = nil + whitelist.State = nil + whitelist.RaftLogDelta = 0 + whitelist.SuggestedCompactions = nil + return whitelist.Equal(storagepb.ReplicatedEvalResult{}) +} + +// stageTrivialReplicatedEvalResult applies the trivial portions of replicatedResult to +// the supplied replicaState and returns whether the change implied an update to +// the replica's truncated state. This function modifies replicaState but does +// not modify replicatedResult in order to give the TestingPostApplyFilter testing knob +// an opportunity to inspect the command's ReplicatedEvalResult. +func stageTrivialReplicatedEvalResult( + ctx context.Context, + replicatedResult *storagepb.ReplicatedEvalResult, + raftAppliedIndex, leaseAppliedIndex uint64, + replicaState *storagepb.ReplicaState, +) (truncatedStateUpdated bool) { + deltaStats := replicatedResult.Delta.ToStats() + replicaState.Stats.Add(deltaStats) + if raftAppliedIndex != 0 { + replicaState.RaftAppliedIndex = raftAppliedIndex + } + if leaseAppliedIndex != 0 { + replicaState.LeaseAppliedIndex = leaseAppliedIndex + } + haveState := replicatedResult.State != nil + truncatedStateUpdated = haveState && replicatedResult.State.TruncatedState != nil + if truncatedStateUpdated { + replicaState.TruncatedState = replicatedResult.State.TruncatedState + } + return truncatedStateUpdated +} + +// clearTrivialReplicatedEvalResultFields is used to zero out the fields of a +// ReplicatedEvalResult that have already been consumed when staging the +// corresponding command and applying it to the current batch's view of the +// ReplicaState. This function is called after a batch has been written to the +// storage engine. For trivial commands this function should result in a zero +// value replicatedResult. +func clearTrivialReplicatedEvalResultFields( + replicatedResult *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool, +) { + // Fields for which no action is taken in this method are zeroed so that + // they don't trigger an assertion at the end of the application process + // (which checks that all fields were handled). + replicatedResult.IsLeaseRequest = false + replicatedResult.Timestamp = hlc.Timestamp{} + replicatedResult.PrevLeaseProposal = nil + // The state fields cleared here were already applied to the in-memory view of + // replica state for this batch. + if haveState := replicatedResult.State != nil; haveState { + replicatedResult.State.Stats = nil + replicatedResult.State.TruncatedState = nil + + // If we're already using the AppliedStateKey then there's nothing + // to do. This flag is idempotent so it's ok that we see this flag + // multiple times, but we want to make sure it doesn't cause us to + // perform repeated state assertions, so clear it before the + // shouldAssert determination. + // A reader might wonder if using the value of usingAppliedState key from + // after applying an entire batch is valid to determine whether this command + // implied a transition, but if it had implied a transition then the batch + // would not have been considered trivial and the current view of will still + // be false as complex state transitions are handled after this call. + if usingAppliedStateKey { + replicatedResult.State.UsingAppliedStateKey = false + } + // ReplicaState.Stats was previously non-nullable which caused nodes to + // send a zero-value MVCCStats structure. If the proposal was generated by + // an old node, we'll have decoded that zero-value structure setting + // ReplicaState.Stats to a non-nil value which would trigger the "unhandled + // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. + // TODO(ajwerner): eliminate this case that likely can no longer occur as of + // at least 19.1. + if replicatedResult.State.Stats != nil && (*replicatedResult.State.Stats == enginepb.MVCCStats{}) { + replicatedResult.State.Stats = nil + } + if *replicatedResult.State == (storagepb.ReplicaState{}) { + replicatedResult.State = nil + } + } + replicatedResult.Delta = enginepb.MVCCStatsDelta{} +} + +// handleRaftCommandResult is called after the current cmd's batch has been +// committed to the storage engine and the trivial side-effects have been +// applied to the Replica's in-memory state. This method deals with applying +// non-trivial side effects, notifying waiting clients, releasing latches, +// informing raft about applied config changes, and asserting hard state as +// required. +func (r *Replica) handleRaftCommandResult( + ctx context.Context, cmd *cmdAppCtx, isNonTrivial, usingAppliedStateKey bool, +) (errExpl string, err error) { + // Set up the local result prior to handling the ReplicatedEvalResult to + // give testing knobs an opportunity to inspect it. + r.prepareLocalResult(cmd.ctx, cmd) + // Handle the ReplicatedEvalResult, executing any side effects of the last + // state machine transition. + // + // Note that this must happen after committing (the engine.Batch), but + // before notifying a potentially waiting client. + clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey) + if isNonTrivial { + r.handleComplexReplicatedEvalResult(cmd.ctx, *cmd.replicatedResult()) + } else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", + cmd.replicatedResult()) + } + + // NB: Perform state assertion before acknowledging the client. + // Some tests (TestRangeStatsInit) assumes that once the store has started + // and the first range has a lease that there will not be a later hard-state. + if isNonTrivial { + // Assert that the on-disk state doesn't diverge from the in-memory + // state as a result of the side effects. + r.mu.Lock() + r.assertStateLocked(ctx, r.store.Engine()) + r.mu.Unlock() + } + + if cmd.localResult != nil { + r.handleLocalEvalResult(cmd.ctx, *cmd.localResult) + } + r.finishRaftCommand(cmd.ctx, cmd) + switch cmd.e.Type { + case raftpb.EntryNormal: + if cmd.replicatedResult().ChangeReplicas != nil { + log.Fatalf(cmd.ctx, "unexpected replication change from command %s", &cmd.raftCmd) + } + case raftpb.EntryConfChange: + if cmd.replicatedResult().ChangeReplicas == nil { + cmd.cc = raftpb.ConfChange{} + } + if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + raftGroup.ApplyConfChange(cmd.cc) + return true, nil + }); err != nil { + const errExpl = "during ApplyConfChange" + return errExpl, errors.Wrap(err, errExpl) + } + } + return "", nil +} + +// handleComplexReplicatedEvalResult carries out the side-effects of non-trivial +// commands. It is run with the raftMu locked. It is illegal to pass a +// replicatedResult that does not imply any side-effects. +func (r *Replica) handleComplexReplicatedEvalResult( + ctx context.Context, replicatedResult storagepb.ReplicatedEvalResult, +) { + + // Assert that this replicatedResult implies at least one side-effect. + if replicatedResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleComplexReplicatedEvalResult") + } + + // Process Split or Merge. This needs to happen after stats update because + // of the ContainsEstimates hack. + if replicatedResult.Split != nil { + splitPostApply( + r.AnnotateCtx(ctx), + replicatedResult.Split.RHSDelta, + &replicatedResult.Split.SplitTrigger, + r, + ) + replicatedResult.Split = nil + } + + if replicatedResult.Merge != nil { + if err := r.store.MergeRange( + ctx, r, replicatedResult.Merge.LeftDesc, replicatedResult.Merge.RightDesc, replicatedResult.Merge.FreezeStart, + ); err != nil { + // Our in-memory state has diverged from the on-disk state. + log.Fatalf(ctx, "failed to update store after merging range: %s", err) + } + replicatedResult.Merge = nil + } + + // Update the remaining ReplicaState. + if replicatedResult.State != nil { + if newDesc := replicatedResult.State.Desc; newDesc != nil { + r.setDesc(ctx, newDesc) + replicatedResult.State.Desc = nil + } + + if newLease := replicatedResult.State.Lease; newLease != nil { + r.leasePostApply(ctx, *newLease, false /* permitJump */) + replicatedResult.State.Lease = nil + } + + if newThresh := replicatedResult.State.GCThreshold; newThresh != nil { + if (*newThresh != hlc.Timestamp{}) { + r.mu.Lock() + r.mu.state.GCThreshold = newThresh + r.mu.Unlock() + } + replicatedResult.State.GCThreshold = nil + } + + if newThresh := replicatedResult.State.TxnSpanGCThreshold; newThresh != nil { + if (*newThresh != hlc.Timestamp{}) { + r.mu.Lock() + r.mu.state.TxnSpanGCThreshold = newThresh + r.mu.Unlock() + } + replicatedResult.State.TxnSpanGCThreshold = nil + } + + if replicatedResult.State.UsingAppliedStateKey { + r.mu.Lock() + r.mu.state.UsingAppliedStateKey = true + r.mu.Unlock() + replicatedResult.State.UsingAppliedStateKey = false + } + + if (*replicatedResult.State == storagepb.ReplicaState{}) { + replicatedResult.State = nil + } + } + + if change := replicatedResult.ChangeReplicas; change != nil { + if change.ChangeType == roachpb.REMOVE_REPLICA && + r.store.StoreID() == change.Replica.StoreID { + // This wants to run as late as possible, maximizing the chances + // that the other nodes have finished this command as well (since + // processing the removal from the queue looks up the Range at the + // lease holder, being too early here turns this into a no-op). + r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) + } + replicatedResult.ChangeReplicas = nil + } + + if replicatedResult.ComputeChecksum != nil { + r.computeChecksumPostApply(ctx, *replicatedResult.ComputeChecksum) + replicatedResult.ComputeChecksum = nil + } + + if !replicatedResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(replicatedResult, storagepb.ReplicatedEvalResult{})) + } +} + +// prepareLocalResult is performed after the command has been committed to the +// engine but before its side-effects have been applied to the Replica's +// in-memory state. This method gives the command an opportunity to interact +// with testing knobs and to set up its local result if it was proposed +// locally. This is performed prior to handling the command's +// ReplicatedEvalResult because the process of handling the replicated eval +// result will zero-out the struct to ensure that is has properly performed all +// of the implied side-effects. +func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { + var pErr *roachpb.Error + if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; filter != nil { + var newPropRetry int + newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{ + CmdID: cmd.idKey, + ReplicatedEvalResult: *cmd.replicatedResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if cmd.proposalRetry == 0 { + cmd.proposalRetry = proposalReevaluationReason(newPropRetry) + } + // calling maybeSetCorrupt here is mostly for tests and looks. The + // interesting errors originate in applyRaftCommandToBatch, and they are + // already handled above. + pErr = r.maybeSetCorrupt(ctx, pErr) + } + if pErr == nil { + pErr = cmd.forcedErr + } + + if cmd.proposedLocally() { + if cmd.proposalRetry != proposalNoReevaluation && pErr == nil { + log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", cmd.proposal) + } + if pErr != nil { + // A forced error was set (i.e. we did not apply the proposal, + // for instance due to its log position) or the Replica is now + // corrupted. + // If proposalRetry is set, we don't also return an error, as per the + // proposalResult contract. + if cmd.proposalRetry == proposalNoReevaluation { + cmd.response.Err = pErr + } + } else if cmd.proposal.Local.Reply != nil { + cmd.response.Reply = cmd.proposal.Local.Reply + } else { + log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd.proposal) + } + cmd.response.Intents = cmd.proposal.Local.DetachIntents() + cmd.response.EndTxns = cmd.proposal.Local.DetachEndTxns(pErr != nil) + if pErr == nil { + cmd.localResult = cmd.proposal.Local + } + } + if pErr != nil && cmd.localResult != nil { + log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) + } + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, cmd.localResult.String()) + } +} + +// finishRaftCommand is called after a command's side effects have been applied +// in order to acknowledge clients and release latches. +func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { + + // Provide the command's corresponding logical operations to the + // Replica's rangefeed. Only do so if the WriteBatch is nonnil, + // otherwise it's valid for the logical op log to be nil, which + // would shut down all rangefeeds. If no rangefeed is running, + // this call will be a noop. + if cmd.raftCmd.WriteBatch != nil { + r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog) + } else if cmd.raftCmd.LogicalOpLog != nil { + log.Fatalf(ctx, "nonnil logical op log with nil write batch: %v", cmd.raftCmd) + } + + // When set to true, recomputes the stats for the LHS and RHS of splits and + // makes sure that they agree with the state's range stats. + const expensiveSplitAssertion = false + + if expensiveSplitAssertion && cmd.replicatedResult().Split != nil { + split := cmd.replicatedResult().Split + lhsStatsMS := r.GetMVCCStats() + lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) + if err != nil { + log.Fatal(ctx, err) + } + + rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + + rhsStatsMS := rightReplica.GetMVCCStats() + rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) + if err != nil { + log.Fatal(ctx, err) + } + + if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { + log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) + } + if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { + log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) + } + } + + if cmd.proposedLocally() { + // If we failed to apply at the right lease index, try again with + // a new one. This is important for pipelined writes, since they + // don't have a client watching to retry, so a failure to + // eventually apply the proposal would be a uservisible error. + // TODO(nvanbenschoten): This reproposal is not tracked by the + // quota pool. We should fix that. + if cmd.proposalRetry == proposalIllegalLeaseIndex && + r.tryReproposeWithNewLeaseIndex(cmd.proposal) { + return + } + // Otherwise, signal the command's status to the client. + cmd.proposal.finishApplication(cmd.response) + } else if cmd.response.Err != nil { + log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err) + } +} diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index bc6ac6416e86..f1f236e41be9 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -531,254 +531,6 @@ func addSSTablePreApply( return copied } -func (r *Replica) handleReplicatedEvalResult( - ctx context.Context, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, -) (shouldAssert bool) { - // Fields for which no action is taken in this method are zeroed so that - // they don't trigger an assertion at the end of the method (which checks - // that all fields were handled). - { - rResult.IsLeaseRequest = false - rResult.Timestamp = hlc.Timestamp{} - rResult.PrevLeaseProposal = nil - } - - if rResult.BlockReads { - r.readOnlyCmdMu.Lock() - defer r.readOnlyCmdMu.Unlock() - rResult.BlockReads = false - } - - // Update MVCC stats and Raft portion of ReplicaState. - deltaStats := rResult.Delta.ToStats() - r.mu.Lock() - r.mu.state.Stats.Add(deltaStats) - if raftAppliedIndex != 0 { - r.mu.state.RaftAppliedIndex = raftAppliedIndex - } - if leaseAppliedIndex != 0 { - r.mu.state.LeaseAppliedIndex = leaseAppliedIndex - } - needsSplitBySize := r.needsSplitBySizeRLocked() - needsMergeBySize := r.needsMergeBySizeRLocked() - r.mu.Unlock() - - r.store.metrics.addMVCCStats(deltaStats) - rResult.Delta = enginepb.MVCCStatsDelta{} - - // NB: the bootstrap store has a nil split queue. - // TODO(tbg): the above is probably a lie now. - if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { - r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - - // The bootstrap store has a nil merge queue. - // TODO(tbg): the above is probably a lie now. - if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { - // TODO(tbg): for ranges which are small but protected from merges by - // other means (zone configs etc), this is called on every command, and - // fires off a goroutine each time. Make this trigger (and potentially - // the split one above, though it hasn't been observed to be as - // bothersome) less aggressive. - r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - - // The above are always present. The following are not always present but - // should not trigger a ReplicaState assertion because they are either too - // frequent to do so or because they do not change the ReplicaState. - - if rResult.State != nil { - // Raft log truncation is too frequent to justify a replica state - // assertion. - if newTruncState := rResult.State.TruncatedState; newTruncState != nil { - rResult.State.TruncatedState = nil // for assertion - - r.mu.Lock() - r.mu.state.TruncatedState = newTruncState - r.mu.Unlock() - - // Clear any entries in the Raft log entry cache for this range up - // to and including the most recently truncated index. - r.store.raftEntryCache.Clear(r.RangeID, newTruncState.Index+1) - - // Truncate the sideloaded storage. Note that this is safe only if the new truncated state - // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately - // could rot. - { - log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index) - if size, _, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { - // We don't *have* to remove these entries for correctness. Log a - // loud error, but keep humming along. - log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) - } else { - rResult.RaftLogDelta -= size - } - } - } - - // ReplicaState.Stats was previously non-nullable which caused nodes to - // send a zero-value MVCCStats structure. If the proposal was generated by - // an old node, we'll have decoded that zero-value structure setting - // ReplicaState.Stats to a non-nil value which would trigger the "unhandled - // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. - if rResult.State.Stats != nil && (*rResult.State.Stats == enginepb.MVCCStats{}) { - rResult.State.Stats = nil - } - - if rResult.State.UsingAppliedStateKey { - r.mu.Lock() - // If we're already using the AppliedStateKey then there's nothing - // to do. This flag is idempotent so it's ok that we see this flag - // multiple times, but we want to make sure it doesn't cause us to - // perform repeated state assertions, so clear it before the - // shouldAssert determination. - if r.mu.state.UsingAppliedStateKey { - rResult.State.UsingAppliedStateKey = false - } - r.mu.Unlock() - } - - if (*rResult.State == storagepb.ReplicaState{}) { - rResult.State = nil - } - } - - if rResult.RaftLogDelta != 0 { - r.mu.Lock() - r.mu.raftLogSize += rResult.RaftLogDelta - r.mu.raftLogLastCheckSize += rResult.RaftLogDelta - // Ensure raftLog{,LastCheck}Size is not negative since it isn't persisted - // between server restarts. - if r.mu.raftLogSize < 0 { - r.mu.raftLogSize = 0 - } - if r.mu.raftLogLastCheckSize < 0 { - r.mu.raftLogLastCheckSize = 0 - } - r.mu.Unlock() - rResult.RaftLogDelta = 0 - } else { - // Check for whether to queue the range for Raft log truncation if this is - // not a Raft log truncation command itself. We don't want to check the - // Raft log for truncation on every write operation or even every operation - // which occurs after the Raft log exceeds RaftLogQueueStaleSize. The logic - // below queues the replica for possible Raft log truncation whenever an - // additional RaftLogQueueStaleSize bytes have been written to the Raft - // log. - r.mu.Lock() - checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize - if checkRaftLog { - r.mu.raftLogLastCheckSize = r.mu.raftLogSize - } - r.mu.Unlock() - if checkRaftLog { - r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - } - - for _, sc := range rResult.SuggestedCompactions { - r.store.compactor.Suggest(ctx, sc) - } - rResult.SuggestedCompactions = nil - - // The rest of the actions are "nontrivial" and may have large effects on the - // in-memory and on-disk ReplicaStates. If any of these actions are present, - // we want to assert that these two states do not diverge. - shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) - - // Process Split or Merge. This needs to happen after stats update because - // of the ContainsEstimates hack. - - if rResult.Split != nil { - splitPostApply( - r.AnnotateCtx(ctx), - rResult.Split.RHSDelta, - &rResult.Split.SplitTrigger, - r, - ) - rResult.Split = nil - } - - if rResult.Merge != nil { - if err := r.store.MergeRange( - ctx, r, rResult.Merge.LeftDesc, rResult.Merge.RightDesc, rResult.Merge.FreezeStart, - ); err != nil { - // Our in-memory state has diverged from the on-disk state. - log.Fatalf(ctx, "failed to update store after merging range: %+v", err) - } - rResult.Merge = nil - } - - // Update the remaining ReplicaState. - - if rResult.State != nil { - if newDesc := rResult.State.Desc; newDesc != nil { - r.setDesc(ctx, newDesc) - rResult.State.Desc = nil - } - - if newLease := rResult.State.Lease; newLease != nil { - r.leasePostApply(ctx, *newLease, false /* permitJump */) - rResult.State.Lease = nil - } - - if newThresh := rResult.State.GCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.GCThreshold = newThresh - r.mu.Unlock() - } - rResult.State.GCThreshold = nil - } - - if newThresh := rResult.State.TxnSpanGCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.TxnSpanGCThreshold = newThresh - r.mu.Unlock() - } - rResult.State.TxnSpanGCThreshold = nil - } - - if rResult.State.UsingAppliedStateKey { - r.mu.Lock() - r.mu.state.UsingAppliedStateKey = true - r.mu.Unlock() - rResult.State.UsingAppliedStateKey = false - } - - if (*rResult.State == storagepb.ReplicaState{}) { - rResult.State = nil - } - } - - if change := rResult.ChangeReplicas; change != nil { - if change.ChangeType == roachpb.REMOVE_REPLICA && - r.store.StoreID() == change.Replica.StoreID { - // This wants to run as late as possible, maximizing the chances - // that the other nodes have finished this command as well (since - // processing the removal from the queue looks up the Range at the - // lease holder, being too early here turns this into a no-op). - // Lock ordering dictates that we don't hold any mutexes when adding, - // so we fire it off in a task. - r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) - } - rResult.ChangeReplicas = nil - } - - if rResult.ComputeChecksum != nil { - r.computeChecksumPostApply(ctx, *rResult.ComputeChecksum) - rResult.ComputeChecksum = nil - } - - if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { - log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) - } - return shouldAssert -} - func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.LocalResult) { // Fields for which no action is taken in this method are zeroed so that // they don't trigger an assertion at the end of the method (which checks @@ -861,25 +613,6 @@ func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.Loca } } -func (r *Replica) handleEvalResultRaftMuLocked( - ctx context.Context, - lResult *result.LocalResult, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, -) { - shouldAssert := r.handleReplicatedEvalResult(ctx, rResult, raftAppliedIndex, leaseAppliedIndex) - if lResult != nil { - r.handleLocalEvalResult(ctx, *lResult) - } - if shouldAssert { - // Assert that the on-disk state doesn't diverge from the in-memory - // state as a result of the side effects. - r.mu.Lock() - r.assertStateLocked(ctx, r.store.Engine()) - r.mu.Unlock() - } -} - // proposalResult indicates the result of a proposal. Exactly one of // Reply and Err is set, and it represents the result of the proposal. type proposalResult struct { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index c1515e125994..a563c0acc0f1 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -20,10 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" @@ -37,7 +34,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/kr/pretty" "github.com/pkg/errors" "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" @@ -375,7 +371,7 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { } type handleRaftReadyStats struct { - processed int + handleCommittedEntriesStats } // noSnap can be passed to handleRaftReady when no snapshot should be processed. @@ -699,92 +695,35 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) r.sendRaftMessages(ctx, otherMsgs) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") - applicationStart := timeutil.Now() - for _, e := range rd.CommittedEntries { - switch e.Type { - case raftpb.EntryNormal: - // NB: Committed entries are handed to us by Raft. Raft does not - // know about sideloading. Consequently the entries here are all - // already inlined. - - var commandID storagebase.CmdIDKey - var command storagepb.RaftCommand - - // Process committed entries. etcd raft occasionally adds a nil entry - // (our own commands are never empty). This happens in two situations: - // When a new leader is elected, and when a config change is dropped due - // to the "one at a time" rule. In both cases we may need to resubmit our - // pending proposals (In the former case we resubmit everything because - // we proposed them to a former leader that is no longer able to commit - // them. In the latter case we only need to resubmit pending config - // changes, but it's hard to distinguish so we resubmit everything - // anyway). We delay resubmission until after we have processed the - // entire batch of entries. - if len(e.Data) == 0 { - // Overwrite unconditionally since this is the most aggressive - // reproposal mode. - if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange { - refreshReason = reasonNewLeaderOrConfigChange - } - commandID = "" // special-cased value, command isn't used - } else { - var encodedCommand []byte - commandID, encodedCommand = DecodeRaftCommand(e.Data) - // An empty command is used to unquiesce a range and wake the - // leader. Clear commandID so it's ignored for processing. - if len(encodedCommand) == 0 { - commandID = "" - } else if err := protoutil.Unmarshal(encodedCommand, &command); err != nil { - const expl = "while unmarshalling entry" - return stats, expl, errors.Wrap(err, expl) - } - } - - if changedRepl := r.processRaftCommand(ctx, commandID, e.Term, e.Index, command); changedRepl { - log.Fatalf(ctx, "unexpected replication change from command %s", &command) - } - r.store.metrics.RaftCommandsApplied.Inc(1) - stats.processed++ - - case raftpb.EntryConfChange: - var cc raftpb.ConfChange - if err := protoutil.Unmarshal(e.Data, &cc); err != nil { - const expl = "while unmarshaling ConfChange" - return stats, expl, errors.Wrap(err, expl) - } - var ccCtx ConfChangeContext - if err := protoutil.Unmarshal(cc.Context, &ccCtx); err != nil { - const expl = "while unmarshaling ConfChangeContext" - return stats, expl, errors.Wrap(err, expl) + applicationStart := timeutil.Now() + if len(rd.CommittedEntries) > 0 { + var expl string + stats.handleCommittedEntriesStats, expl, err = + r.handleCommittedEntriesRaftMuLocked(ctx, rd.CommittedEntries) + if err != nil { + return stats, expl, err + } + // etcd raft occasionally adds a nil entry (our own commands are never + // empty). This happens in two situations: When a new leader is elected, and + // when a config change is dropped due to the "one at a time" rule. In both + // cases we may need to resubmit our pending proposals (In the former case + // we resubmit everything because we proposed them to a former leader that + // is no longer able to commit them. In the latter case we only need to + // resubmit pending config changes, but it's hard to distinguish so we + // resubmit everything anyway). We delay resubmission until after we have + // processed the entire batch of entries. + if stats.numEmptyEntries > 0 { + // Overwrite unconditionally since this is the most aggressive + // reproposal mode. + if !r.store.TestingKnobs().DisableRefreshReasonNewLeaderOrConfigChange { + refreshReason = reasonNewLeaderOrConfigChange } - var command storagepb.RaftCommand - if err := protoutil.Unmarshal(ccCtx.Payload, &command); err != nil { - const expl = "while unmarshaling RaftCommand" - return stats, expl, errors.Wrap(err, expl) - } - commandID := storagebase.CmdIDKey(ccCtx.CommandID) - if changedRepl := r.processRaftCommand( - ctx, commandID, e.Term, e.Index, command, - ); !changedRepl { - // If we did not apply the config change, tell raft that the config change was aborted. - cc = raftpb.ConfChange{} - } - stats.processed++ - - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { - raftGroup.ApplyConfChange(cc) - return true, nil - }); err != nil { - const expl = "during ApplyConfChange" - return stats, expl, errors.Wrap(err, expl) - } - default: - log.Fatalf(ctx, "unexpected Raft entry: %v", e) } } applicationElapsed := timeutil.Since(applicationStart).Nanoseconds() r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed) + if refreshReason != noReason { r.mu.Lock() r.refreshProposalsLocked(0, refreshReason) @@ -1196,151 +1135,6 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID } } -func (r *Replica) checkForcedErrLocked( - ctx context.Context, - idKey storagebase.CmdIDKey, - raftCmd storagepb.RaftCommand, - proposal *ProposalData, - proposedLocally bool, -) (uint64, proposalReevaluationReason, *roachpb.Error) { - leaseIndex := r.mu.state.LeaseAppliedIndex - - isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest - var requestedLease roachpb.Lease - if isLeaseRequest { - requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease - } - if idKey == "" { - // This is an empty Raft command (which is sent by Raft after elections - // to trigger reproposals or during concurrent configuration changes). - // Nothing to do here except making sure that the corresponding batch - // (which is bogus) doesn't get executed (for it is empty and so - // properties like key range are undefined). - return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") - } - - // Verify the lease matches the proposer's expectation. We rely on - // the proposer's determination of whether the existing lease is - // held, and can be used, or is expired, and can be replaced. - // Verify checks that the lease has not been modified since proposal - // due to Raft delays / reorderings. - // To understand why this lease verification is necessary, see comments on the - // proposer_lease field in the proto. - leaseMismatch := false - if raftCmd.DeprecatedProposerLease != nil { - // VersionLeaseSequence must not have been active when this was proposed. - // - // This does not prevent the lease race condition described below. The - // reason we don't fix this here as well is because fixing the race - // requires a new cluster version which implies that we'll already be - // using lease sequence numbers and will fall into the case below. - leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*r.mu.state.Lease) - } else { - leaseMismatch = raftCmd.ProposerLeaseSequence != r.mu.state.Lease.Sequence - if !leaseMismatch && isLeaseRequest { - // Lease sequence numbers are a reflection of lease equivalency - // between subsequent leases. However, Lease.Equivalent is not fully - // symmetric, meaning that two leases may be Equivalent to a third - // lease but not Equivalent to each other. If these leases are - // proposed under that same third lease, neither will be able to - // detect whether the other has applied just by looking at the - // current lease sequence number because neither will will increment - // the sequence number. - // - // This can lead to inversions in lease expiration timestamps if - // we're not careful. To avoid this, if a lease request's proposer - // lease sequence matches the current lease sequence and the current - // lease sequence also matches the requested lease sequence, we make - // sure the requested lease is Equivalent to current lease. - if r.mu.state.Lease.Sequence == requestedLease.Sequence { - // It is only possible for this to fail when expiration-based - // lease extensions are proposed concurrently. - leaseMismatch = !r.mu.state.Lease.Equivalent(requestedLease) - } - - // This is a check to see if the lease we proposed this lease request against is the same - // lease that we're trying to update. We need to check proposal timestamps because - // extensions don't increment sequence numbers. Without this check a lease could - // be extended and then another lease proposed against the original lease would - // be applied over the extension. - if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && - (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *r.mu.state.Lease.ProposedTS) { - leaseMismatch = true - } - } - } - if leaseMismatch { - log.VEventf( - ctx, 1, - "command proposed from replica %+v with lease #%d incompatible to %v", - raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *r.mu.state.Lease, - ) - if isLeaseRequest { - // For lease requests we return a special error that - // redirectOnOrAcquireLease() understands. Note that these - // requests don't go through the DistSender. - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *r.mu.state.Lease, - Requested: requestedLease, - Message: "proposed under invalid lease", - }) - } - // We return a NotLeaseHolderError so that the DistSender retries. - nlhe := newNotLeaseHolderError( - r.mu.state.Lease, raftCmd.ProposerReplica.StoreID, r.mu.state.Desc) - nlhe.CustomMsg = fmt.Sprintf( - "stale proposal: command was proposed under lease #%d but is being applied "+ - "under lease: %s", raftCmd.ProposerLeaseSequence, r.mu.state.Lease) - return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) - } - - if isLeaseRequest { - // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This - // makes sense since lease commands are proposed by anyone, so we can't expect a coherent - // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the - // counter makes sense from a testing perspective. - // - // However, leases get special vetting to make sure we don't give one to a replica that was - // since removed (see #15385 and a comment in redirectOnOrAcquireLease). - if _, ok := r.mu.state.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *r.mu.state.Lease, - Requested: requestedLease, - Message: "replica not part of range", - }) - } - } else if r.mu.state.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { - // The happy case: the command is applying at or ahead of the minimal - // permissible index. It's ok if it skips a few slots (as can happen - // during rearrangement); this command will apply, but later ones which - // were proposed at lower indexes may not. Overall though, this is more - // stable and simpler than requiring commands to apply at their exact - // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 - // is otherwise tricky since we can't tell the client to try again - // (reproposals could exist and may apply at the right index, leading - // to a replay), and assigning the required index would be tedious - // seeing that it would have to rewind sometimes. - leaseIndex = raftCmd.MaxLeaseIndex - } else { - // The command is trying to apply at a past log position. That's - // unfortunate and hopefully rare; the client on the proposer will try - // again. Note that in this situation, the leaseIndex does not advance. - retry := proposalNoReevaluation - if proposedLocally { - log.VEventf( - ctx, 1, - "retry proposal %x: applied at lease index %d, required < %d", - proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, - ) - retry = proposalIllegalLeaseIndex - } - return leaseIndex, retry, roachpb.NewErrorf( - "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, - ) - } - return leaseIndex, proposalNoReevaluation, nil -} - type snapTruncationInfo struct { index uint64 deadline time.Time @@ -1591,383 +1385,6 @@ func (m lastUpdateTimesMap) isFollowerActive( return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration } -// processRaftCommand handles the complexities involved in moving the Raft -// state of a Replica forward. At a high level, it receives a proposal, which -// contains the evaluation of a batch (at its heart a WriteBatch, to be applied -// to the underlying storage engine), which it applies and for which it signals -// the client waiting for it (if it's waiting on this Replica). -// -// The proposal also contains auxiliary data which needs to be verified in order -// to decide whether the proposal should be applied: the command's MaxLeaseIndex -// must move the state machine's LeaseAppliedIndex forward, and the proposer's -// lease (or rather its sequence number) must match that of the state machine. -// Furthermore, the GCThreshold is validated and it is checked whether the -// request's key span is contained in the Replica's (it is unclear whether all -// of these checks are necessary). If any of the checks fail, the proposal's -// content is wiped and we apply an empty log entry instead, returning an error -// to the caller to handle. The two typical cases are the lease mismatch (in -// which case the caller tries to send the command to the actual leaseholder) -// and violations of the LeaseAppliedIndex (in which the caller tries again). -// -// Assuming all checks were passed, the command should be applied to the engine, -// which is done by the aptly named applyRaftCommand. -// -// For simple proposals this is the whole story, but some commands trigger -// additional code in this method. The standard way in which this is triggered -// is via a side effect communicated in the proposal's ReplicatedEvalResult and, -// for local proposals, the LocalEvalResult. These might, for example, trigger -// an update of the Replica's in-memory state to match updates to the on-disk -// state, or pass intents to the intent resolver. Some commands don't fit this -// simple schema and need to hook deeper into the code. Notably splits and merges -// need to acquire locks on their right-hand side Replicas and may need to add -// data to the WriteBatch before it is applied; similarly, changes to the disk -// layout of internal state typically require a migration which shows up here. -// -// This method returns true if the command successfully applied a replica -// change. -func (r *Replica) processRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - term, raftIndex uint64, - raftCmd storagepb.RaftCommand, -) (changedRepl bool) { - if raftIndex == 0 { - log.Fatalf(ctx, "processRaftCommand requires a non-zero index") - } - - if log.V(4) { - log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", idKey, raftCmd.MaxLeaseIndex) - } - - var ts hlc.Timestamp - if idKey != "" { - ts = raftCmd.ReplicatedEvalResult.Timestamp - } - - r.mu.Lock() - proposal, proposedLocally := r.mu.proposals[idKey] - - // TODO(tschottdorf): consider the Trace situation here. - if proposedLocally { - // We initiated this command, so use the caller-supplied context. - ctx = proposal.ctx - delete(r.mu.proposals, idKey) - - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized. - if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, proposal.quotaSize) - } - } - - leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally) - - r.mu.Unlock() - - if forcedErr == nil { - // Verify that the batch timestamp is after the GC threshold. This is - // necessary because not all commands declare read access on the GC - // threshold key, even though they implicitly depend on it. This means - // that access to this state will not be serialized by latching, - // so we must perform this check upstream and downstream of raft. - // See #14833. - // - // We provide an empty key span because we already know that the Raft - // command is allowed to apply within its key range. This is guaranteed - // by checks upstream of Raft, which perform the same validation, and by - // span latches, which assure that any modifications to the range's - // boundaries will be serialized with this command. Finally, the - // leaseAppliedIndex check in checkForcedErrLocked ensures that replays - // outside of the spanlatch manager's control which break this - // serialization ordering will already by caught and an error will be - // thrown. - forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) - } - - // applyRaftCommand will return "expected" errors, but may also indicate - // replica corruption (as of now, signaled by a replicaCorruptionError). - // We feed its return through maybeSetCorrupt to act when that happens. - if forcedErr != nil { - log.VEventf(ctx, 1, "applying command with forced error: %s", forcedErr) - } else { - log.Event(ctx, "applying command") - - if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, raftCmd); err != nil { - log.Eventf(ctx, "unable to acquire split lock: %s", err) - // Send a crash report because a former bug in the error handling might have - // been the root cause of #19172. - _ = r.store.stopper.RunAsyncTask(ctx, "crash report", func(ctx context.Context) { - log.SendCrashReport( - ctx, - &r.store.cfg.Settings.SV, - 0, // depth - "while acquiring split lock: %s", - []interface{}{err}, - log.ReportTypeError, - ) - }) - - forcedErr = roachpb.NewError(err) - } else if splitMergeUnlock != nil { - // Close over raftCmd to capture its value at execution time; we clear - // ReplicatedEvalResult on certain errors. - defer func() { - splitMergeUnlock(raftCmd.ReplicatedEvalResult) - }() - } - } - - var response proposalResult - var writeBatch *storagepb.WriteBatch - { - if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; forcedErr == nil && filter != nil { - var newPropRetry int - newPropRetry, forcedErr = filter(storagebase.ApplyFilterArgs{ - CmdID: idKey, - ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if proposalRetry == 0 { - proposalRetry = proposalReevaluationReason(newPropRetry) - } - } - - if forcedErr != nil { - // Apply an empty entry. - raftCmd.ReplicatedEvalResult = storagepb.ReplicatedEvalResult{} - raftCmd.WriteBatch = nil - raftCmd.LogicalOpLog = nil - } - - // Update the node clock with the serviced request. This maintains - // a high water mark for all ops serviced, so that received ops without - // a timestamp specified are guaranteed one higher than any op already - // executed for overlapping keys. - r.store.Clock().Update(ts) - - var pErr *roachpb.Error - if raftCmd.WriteBatch != nil { - writeBatch = raftCmd.WriteBatch - } - - if deprecatedDelta := raftCmd.ReplicatedEvalResult.DeprecatedDelta; deprecatedDelta != nil { - raftCmd.ReplicatedEvalResult.Delta = deprecatedDelta.ToStatsDelta() - raftCmd.ReplicatedEvalResult.DeprecatedDelta = nil - } - - // AddSSTable ingestions run before the actual batch. This makes sure - // that when the Raft command is applied, the ingestion has definitely - // succeeded. Note that we have taken precautions during command - // evaluation to avoid having mutations in the WriteBatch that affect - // the SSTable. Not doing so could result in order reversal (and missing - // values) here. If the key range we are ingesting into isn't empty, - // we're not using AddSSTable but a plain WriteBatch. - if raftCmd.ReplicatedEvalResult.AddSSTable != nil { - copied := addSSTablePreApply( - ctx, - r.store.cfg.Settings, - r.store.engine, - r.raftMu.sideloaded, - term, - raftIndex, - *raftCmd.ReplicatedEvalResult.AddSSTable, - r.store.limiters.BulkIOWriteRate, - ) - r.store.metrics.AddSSTableApplications.Inc(1) - if copied { - r.store.metrics.AddSSTableApplicationCopies.Inc(1) - } - raftCmd.ReplicatedEvalResult.AddSSTable = nil - } - - if raftCmd.ReplicatedEvalResult.Split != nil { - // Splits require a new HardState to be written to the new RHS - // range (and this needs to be atomic with the main batch). This - // cannot be constructed at evaluation time because it differs - // on each replica (votes may have already been cast on the - // uninitialized replica). Transform the write batch to add the - // updated HardState. - // See https://github.com/cockroachdb/cockroach/issues/20629 - // - // This is not the most efficient, but it only happens on splits, - // which are relatively infrequent and don't write much data. - tmpBatch := r.store.engine.NewBatch() - if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - log.Fatal(ctx, err) - } - splitPreApply(ctx, tmpBatch, raftCmd.ReplicatedEvalResult.Split.SplitTrigger) - writeBatch.Data = tmpBatch.Repr() - tmpBatch.Close() - } - - if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil { - // Merges require the subsumed range to be atomically deleted when the - // merge transaction commits. - // - // This is not the most efficient, but it only happens on merges, - // which are relatively infrequent and don't write much data. - tmpBatch := r.store.engine.NewBatch() - if err := tmpBatch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - log.Fatal(ctx, err) - } - rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - const destroyData = false - err = rhsRepl.preDestroyRaftMuLocked(ctx, tmpBatch, tmpBatch, merge.RightDesc.NextReplicaID, destroyData) - if err != nil { - log.Fatal(ctx, err) - } - writeBatch.Data = tmpBatch.Repr() - tmpBatch.Close() - } - - { - var err error - raftCmd.ReplicatedEvalResult, err = r.applyRaftCommand( - ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch) - - // applyRaftCommand returned an error, which usually indicates - // either a serious logic bug in CockroachDB or a disk - // corruption/out-of-space issue. Make sure that these fail with - // descriptive message so that we can differentiate the root causes. - if err != nil { - log.Errorf(ctx, "unable to update the state machine: %+v", err) - // Report the fatal error separately and only with the error, as that - // triggers an optimization for which we directly report the error to - // sentry (which in turn allows sentry to distinguish different error - // types). - log.Fatal(ctx, err) - } - } - - if filter := r.store.cfg.TestingKnobs.TestingPostApplyFilter; pErr == nil && filter != nil { - var newPropRetry int - newPropRetry, pErr = filter(storagebase.ApplyFilterArgs{ - CmdID: idKey, - ReplicatedEvalResult: raftCmd.ReplicatedEvalResult, - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if proposalRetry == 0 { - proposalRetry = proposalReevaluationReason(newPropRetry) - } - - } - - // calling maybeSetCorrupt here is mostly for tests and looks. The - // interesting errors originate in applyRaftCommand, and they are - // already handled above. - pErr = r.maybeSetCorrupt(ctx, pErr) - if pErr == nil { - pErr = forcedErr - } - - var lResult *result.LocalResult - if proposedLocally { - if proposalRetry != proposalNoReevaluation && pErr == nil { - log.Fatalf(ctx, "proposal with nontrivial retry behavior, but no error: %+v", proposal) - } - if pErr != nil { - // A forced error was set (i.e. we did not apply the proposal, - // for instance due to its log position) or the Replica is now - // corrupted. - // If proposalRetry is set, we don't also return an error, as per the - // proposalResult contract. - if proposalRetry == proposalNoReevaluation { - response.Err = pErr - } - } else if proposal.Local.Reply != nil { - response.Reply = proposal.Local.Reply - } else { - log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", proposal) - } - response.Intents = proposal.Local.DetachIntents() - response.EndTxns = proposal.Local.DetachEndTxns(pErr != nil) - if pErr == nil { - lResult = proposal.Local - } - } - if pErr != nil && lResult != nil { - log.Fatalf(ctx, "shouldn't have a local result if command processing failed. pErr: %s", pErr) - } - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, lResult.String()) - } - - // Handle the Result, executing any side effects of the last - // state machine transition. - // - // Note that this must happen after committing (the engine.Batch), but - // before notifying a potentially waiting client. - r.handleEvalResultRaftMuLocked(ctx, lResult, - raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex) - - // Provide the command's corresponding logical operations to the - // Replica's rangefeed. Only do so if the WriteBatch is non-nil, - // otherwise it's valid for the logical op log to be nil, which - // would shut down all rangefeeds. If no rangefeed is running, - // this call will be a no-op. - if raftCmd.WriteBatch != nil { - r.handleLogicalOpLogRaftMuLocked(ctx, raftCmd.LogicalOpLog) - } else if raftCmd.LogicalOpLog != nil { - log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", raftCmd) - } - } - - // When set to true, recomputes the stats for the LHS and RHS of splits and - // makes sure that they agree with the state's range stats. - const expensiveSplitAssertion = false - - if expensiveSplitAssertion && raftCmd.ReplicatedEvalResult.Split != nil { - split := raftCmd.ReplicatedEvalResult.Split - lhsStatsMS := r.GetMVCCStats() - lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } - - rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - - rhsStatsMS := rightReplica.GetMVCCStats() - rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } - - if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) - } - if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) - } - } - - if proposedLocally { - // If we failed to apply at the right lease index, try again with - // a new one. This is important for pipelined writes, since they - // don't have a client watching to retry, so a failure to - // eventually apply the proposal would be a user-visible error. - // TODO(nvanbenschoten): This reproposal is not tracked by the - // quota pool. We should fix that. - if proposalRetry == proposalIllegalLeaseIndex && r.tryReproposeWithNewLeaseIndex(proposal) { - return false - } - // Otherwise, signal the command's status to the client. - proposal.finishApplication(response) - } else if response.Err != nil { - log.VEventf(ctx, 1, "applying raft command resulted in error: %s", response.Err) - } - - return raftCmd.ReplicatedEvalResult.ChangeReplicas != nil -} - // tryReproposeWithNewLeaseIndex is used by processRaftCommand to // repropose commands that have gotten an illegal lease index error, // and that we know could not have applied while their lease index was @@ -2073,7 +1490,7 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( // applying the command to perform any necessary cleanup. func (r *Replica) maybeAcquireSplitMergeLock( ctx context.Context, raftCmd storagepb.RaftCommand, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { if split := raftCmd.ReplicatedEvalResult.Split; split != nil { return r.acquireSplitLock(ctx, &split.SplitTrigger) } else if merge := raftCmd.ReplicatedEvalResult.Merge; merge != nil { @@ -2084,7 +1501,7 @@ func (r *Replica) maybeAcquireSplitMergeLock( func (r *Replica) acquireSplitLock( ctx context.Context, split *roachpb.SplitTrigger, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { rightRng, created, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil) if err != nil { return nil, err @@ -2101,7 +1518,7 @@ func (r *Replica) acquireSplitLock( // commands that have reproposals interacting with retries (i.e. we don't // treat splits differently). - return func(rResult storagepb.ReplicatedEvalResult) { + return func(rResult *storagepb.ReplicatedEvalResult) { if rResult.Split == nil && created && !rightRng.IsInitialized() { // An error occurred during processing of the split and the RHS is still // uninitialized. Mark the RHS destroyed and remove it from the replica's @@ -2127,7 +1544,7 @@ func (r *Replica) acquireSplitLock( func (r *Replica) acquireMergeLock( ctx context.Context, merge *roachpb.MergeTrigger, -) (func(storagepb.ReplicatedEvalResult), error) { +) (func(*storagepb.ReplicatedEvalResult), error) { // The merge lock is the right-hand replica's raftMu. The right-hand replica // is required to exist on this store. Otherwise, an incoming snapshot could // create the right-hand replica before the merge trigger has a chance to @@ -2145,199 +1562,11 @@ func (r *Replica) acquireMergeLock( log.Fatalf(ctx, "RHS of merge %s <- %s not present on store; found %s in place of the RHS", merge.LeftDesc, merge.RightDesc, rightDesc) } - return func(storagepb.ReplicatedEvalResult) { + return func(*storagepb.ReplicatedEvalResult) { rightRepl.raftMu.Unlock() }, nil } -// applyRaftCommand applies a raft command from the replicated log to the -// underlying state machine (i.e. the engine). When the state machine can not be -// updated, an error (which is likely fatal!) is returned and must be handled by -// the caller. -// The returned ReplicatedEvalResult replaces the caller's. -func (r *Replica) applyRaftCommand( - ctx context.Context, - idKey storagebase.CmdIDKey, - rResult storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, - writeBatch *storagepb.WriteBatch, -) (storagepb.ReplicatedEvalResult, error) { - if writeBatch != nil && len(writeBatch.Data) > 0 { - // Record the write activity, passing a 0 nodeID because replica.writeStats - // intentionally doesn't track the origin of the writes. - mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) - if err != nil { - log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err) - } else { - r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) - } - } - - r.mu.Lock() - usingAppliedStateKey := r.mu.state.UsingAppliedStateKey - oldRaftAppliedIndex := r.mu.state.RaftAppliedIndex - oldLeaseAppliedIndex := r.mu.state.LeaseAppliedIndex - oldTruncatedState := r.mu.state.TruncatedState - - // Exploit the fact that a split will result in a full stats - // recomputation to reset the ContainsEstimates flag. - // - // TODO(tschottdorf): We want to let the usual MVCCStats-delta - // machinery update our stats for the left-hand side. But there is no - // way to pass up an MVCCStats object that will clear out the - // ContainsEstimates flag. We should introduce one, but the migration - // makes this worth a separate effort (ContainsEstimates would need to - // have three possible values, 'UNCHANGED', 'NO', and 'YES'). - // Until then, we're left with this rather crude hack. - if rResult.Split != nil { - r.mu.state.Stats.ContainsEstimates = false - } - ms := *r.mu.state.Stats - r.mu.Unlock() - - if raftAppliedIndex != oldRaftAppliedIndex+1 { - // If we have an out of order index, there's corruption. No sense in - // trying to update anything or running the command. Simply return - // a corruption error. - return storagepb.ReplicatedEvalResult{}, errors.Errorf("applied index jumped from %d to %d", - oldRaftAppliedIndex, raftAppliedIndex) - } - - haveTruncatedState := rResult.State != nil && rResult.State.TruncatedState != nil - var batch engine.Batch - if !haveTruncatedState { - batch = r.store.Engine().NewWriteOnlyBatch() - } else { - // When we update the truncated state, we may need to read the batch - // and can't use a WriteOnlyBatch. This is fine since log truncations - // are tiny batches. - batch = r.store.Engine().NewBatch() - } - defer batch.Close() - - if writeBatch != nil { - if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to apply WriteBatch") - } - } - - // The only remaining use of the batch is for range-local keys which we know - // have not been previously written within this batch. - writer := batch.Distinct() - - // Special-cased MVCC stats handling to exploit commutativity of stats delta - // upgrades. Thanks to commutativity, the spanlatch manager does not have to - // serialize on the stats key. - deltaStats := rResult.Delta.ToStats() - - if !usingAppliedStateKey && rResult.State != nil && rResult.State.UsingAppliedStateKey { - // The Raft command wants us to begin using the RangeAppliedState key - // and we haven't performed the migration yet. Delete the old keys - // that this new key is replacing. - err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to migrate to range applied state") - } - usingAppliedStateKey = true - } - - if usingAppliedStateKey { - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms.Add(deltaStats) - - // Set the range applied state, which includes the last applied raft and - // lease index along with the mvcc stats, all in one key. - if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, - raftAppliedIndex, leaseAppliedIndex, &ms); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set range applied state") - } - } else { - // Advance the last applied index. We use a blind write in order to avoid - // reading the previous applied index keys on every write operation. This - // requires a little additional work in order maintain the MVCC stats. - var appliedIndexNewMS enginepb.MVCCStats - if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, - raftAppliedIndex, leaseAppliedIndex); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to set applied index") - } - deltaStats.SysBytes += appliedIndexNewMS.SysBytes - - r.raftMu.stateLoader.CalcAppliedIndexSysBytes(oldRaftAppliedIndex, oldLeaseAppliedIndex) - - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms.Add(deltaStats) - if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to update MVCCStats") - } - } - - if haveTruncatedState { - apply, err := handleTruncatedStateBelowRaft(ctx, oldTruncatedState, rResult.State.TruncatedState, r.raftMu.stateLoader, writer) - if err != nil { - return storagepb.ReplicatedEvalResult{}, err - } - if !apply { - // The truncated state was discarded, so make sure we don't apply - // it to our in-memory state. - rResult.State.TruncatedState = nil - rResult.RaftLogDelta = 0 - // We received a truncation that doesn't apply to us, so we know that - // there's a leaseholder out there with a log that has earlier entries - // than ours. That leader also guided our log size computations by - // giving us RaftLogDeltas for past truncations, and this was likely - // off. Mark our Raft log size is not trustworthy so that, assuming - // we step up as leader at some point in the future, we recompute - // our numbers. - r.mu.Lock() - r.mu.raftLogSizeTrusted = false - r.mu.Unlock() - } - } - - // TODO(peter): We did not close the writer in an earlier version of - // the code, which went undetected even though we used the batch after - // (though only to commit it). We should add an assertion to prevent that in - // the future. - writer.Close() - - start := timeutil.Now() - - var assertHS *raftpb.HardState - if util.RaceEnabled && rResult.Split != nil { - rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) - oldHS, err := rsl.LoadHardState(ctx, r.store.Engine()) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") - } - assertHS = &oldHS - } - if err := batch.Commit(false); err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "could not commit batch") - } - - if assertHS != nil { - // Load the HardState that was just committed (if any). - rsl := stateloader.Make(rResult.Split.RightDesc.RangeID) - newHS, err := rsl.LoadHardState(ctx, r.store.Engine()) - if err != nil { - return storagepb.ReplicatedEvalResult{}, errors.Wrap(err, "unable to load HardState") - } - // Assert that nothing moved "backwards". - if newHS.Term < assertHS.Term || (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { - log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", - pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) - } - } - - elapsed := timeutil.Since(start) - r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) - rResult.Delta = deltaStats.ToStatsDelta() - return rResult, nil -} - // handleTruncatedStateBelowRaft is called when a Raft command updates the truncated // state. This isn't 100% trivial for two reasons: // - in 19.1 we're making the TruncatedState key unreplicated, so there's a migration @@ -2358,7 +1587,7 @@ func handleTruncatedStateBelowRaft( ctx context.Context, oldTruncatedState, newTruncatedState *roachpb.RaftTruncatedState, loader stateloader.StateLoader, - distinctEng engine.ReadWriter, + batch engine.ReadWriter, ) (_apply bool, _ error) { // If this is a log truncation, load the resulting unreplicated or legacy // replicated truncated state (in that order). If the migration is happening @@ -2368,7 +1597,7 @@ func handleTruncatedStateBelowRaft( // Either way, we'll update it below. // // See VersionUnreplicatedRaftTruncatedState for details. - truncStatePostApply, truncStateIsLegacy, err := loader.LoadRaftTruncatedState(ctx, distinctEng) + truncStatePostApply, truncStateIsLegacy, err := loader.LoadRaftTruncatedState(ctx, batch) if err != nil { return false, errors.Wrap(err, "loading truncated state") } @@ -2390,7 +1619,7 @@ func handleTruncatedStateBelowRaft( // NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to // avoid allocating when constructing Raft log keys (16 bytes). unsafeKey := prefixBuf.RaftLogKey(idx) - if err := distinctEng.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { + if err := batch.Clear(engine.MakeMVCCMetadataKey(unsafeKey)); err != nil { return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState) } } @@ -2413,7 +1642,7 @@ func handleTruncatedStateBelowRaft( _ = cluster.VersionUnreplicatedRaftTruncatedState if err := engine.MVCCPutProto( - ctx, distinctEng, nil /* ms */, prefixBuf.RaftTruncatedStateKey(), + ctx, batch, nil /* ms */, prefixBuf.RaftTruncatedStateKey(), hlc.Timestamp{}, nil /* txn */, newTruncatedState, ); err != nil { return false, errors.Wrap(err, "unable to migrate RaftTruncatedState") diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 97e421f405c2..f59e7e956f10 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -900,8 +900,9 @@ func TestLeaseReplicaNotInDesc(t *testing.T) { }, } tc.repl.mu.Lock() - _, _, pErr := tc.repl.checkForcedErrLocked( + _, _, pErr := checkForcedErr( context.Background(), makeIDKey(), raftCmd, nil /* proposal */, false, /* proposedLocally */ + &tc.repl.mu.state, ) tc.repl.mu.Unlock() if _, isErr := pErr.GetDetail().(*roachpb.LeaseRejectedError); !isErr { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index c9b0af947974..bfea43aff073 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3610,8 +3610,8 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { // processing time means we'll have starved local replicas of ticks and // remote replicas will likely start campaigning. if elapsed >= defaultReplicaRaftMuWarnThreshold { - log.Warningf(ctx, "handle raft ready: %.1fs [processed=%d]", - elapsed.Seconds(), stats.processed) + log.Warningf(ctx, "handle raft ready: %.1fs [applied=%d, batches=%d, state_assertions=%d]", + elapsed.Seconds(), stats.entriesProcessed, stats.batchesProcessed, stats.stateAssertions) } if !r.IsInitialized() { // Only an uninitialized replica can have a placeholder since, by diff --git a/pkg/storage/track_raft_protos.go b/pkg/storage/track_raft_protos.go index 313ab5cac8d8..23c1028bd867 100644 --- a/pkg/storage/track_raft_protos.go +++ b/pkg/storage/track_raft_protos.go @@ -34,7 +34,7 @@ func funcName(f interface{}) string { // instrumentation and returns the list of downstream-of-raft protos. func TrackRaftProtos() func() []reflect.Type { // Grab the name of the function that roots all raft operations. - processRaftFunc := funcName((*Replica).processRaftCommand) + stageRaftFunc := funcName((*Replica).stageRaftCommand) // We only need to track protos that could cause replica divergence // by being written to disk downstream of raft. whitelist := []string{ @@ -104,7 +104,7 @@ func TrackRaftProtos() func() []reflect.Type { break } - if strings.Contains(f.Function, processRaftFunc) { + if strings.Contains(f.Function, stageRaftFunc) { belowRaftProtos.Lock() belowRaftProtos.inner[t] = struct{}{} belowRaftProtos.Unlock()