From fc95a5aa590fe9d7a0050eedf07dd1bbe5bfa2ca Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 1 Aug 2019 22:47:50 -0400 Subject: [PATCH] storage/apply: create apply package for raft entry application The new package provides abstractions and routines associated with the application of committed raft entries to a replicated state machine. This was inspired by four driving forces: - We've been having a number of discussions on the Core team about making storage abstractions more clear and easier to understand in isolation. One commonly discussed proposal is introducing a `storage/replicate` package that would encapsulate the concerns of raft replication (e.g. log manipulation, snapshots, leader election, heartbeats, etc.). This `storage/apply` package will fit in nicely alongside a replication abstraction. - Initial discussion on #38954 concluded that adding an optimization to acknowledge clients after their raft entries have committed but before they had been applied with the current code structure was moving in the opposite direction and making things even harder to understand due to the introduction of more complex state management. - Recent instability in this area (#38976, #39064, #39135, #39203) has revealed that there exists a high degree of difficulty involved in testing any of the logic in the area of raft entry application. This has naturally led to testing at a distance using tools like testing hooks, which is frustrating and delicate. As a result, we're missing tests for thing like old migrations that we still need to support. We also have trouble writing regression tests when bugs do pop up. - The proposed optimization in https://github.com/cockroachdb/cockroach/issues/17500#issuecomment-425689048 to apply committed raft entries to the Replica storage engine asynchronously in a separate thread than the raft processing thread will make entry application significantly more complex. For instance, we'll likely need to introduce a separate scheduler to coordinate entry application passes across Ranges on a node. The schedule will likely want to prioritize leaders over followers and embed other policies to optimize for total system throughput. There's a strong desire to isolate this new complexity and to give the logic a place to live. The PR begins to address these concerns by formalizing the process of applying committed raft entries. To start, this makes the process easier to understand both in terms of the macro-level steps that are taken during application of a batch of entries and in terms of the impact that an individual command has on the replicated state machine. For instance, the PR helps provide answers to all of the following questions: - What are the stages of raft entry application? - What is the difference between a "raft entry" and a "replicated command"? - What can a command do besides apply its write batch to the storage engine? - What does it mean for a successfully replicated command to be rejected during application? - When can we acknowledge the outcome of a raft proposal? The refactor also uncovers a large testing surface that future PRs will exploit to write targeted unit tests. Not only can the `storage/apply` package be tested with a mock state machine (done in this PR), but we can test Replica's implementation of the state machine interface in isolation without needing to touch raft at all. Finally, the refactor paves the way for making the proposed change in #38954 in a much cleaner way. This is demonstrated in next commit, which is being included here to show why certain things were designed the way they were but will not be merged with this PR. Release note: None --- pkg/storage/apply/cmd.go | 204 ++++ pkg/storage/apply/task.go | 219 +++++ pkg/storage/apply/task_test.go | 218 +++++ pkg/storage/cmd_app_batch.go | 131 --- pkg/storage/cmd_app_ctx.go | 80 +- pkg/storage/cmd_app_ctx_buf.go | 158 +++- pkg/storage/cmd_app_ctx_buf_test.go | 12 +- pkg/storage/replica.go | 4 + pkg/storage/replica_application.go | 897 ------------------ pkg/storage/replica_application_impl.go | 1040 +++++++++++++++++++++ pkg/storage/replica_application_result.go | 409 +++----- pkg/storage/replica_proposal.go | 16 +- pkg/storage/replica_raft.go | 22 +- pkg/storage/replica_test.go | 6 +- pkg/storage/track_raft_protos.go | 7 +- 15 files changed, 2036 insertions(+), 1387 deletions(-) create mode 100644 pkg/storage/apply/cmd.go create mode 100644 pkg/storage/apply/task.go create mode 100644 pkg/storage/apply/task_test.go delete mode 100644 pkg/storage/cmd_app_batch.go delete mode 100644 pkg/storage/replica_application.go create mode 100644 pkg/storage/replica_application_impl.go diff --git a/pkg/storage/apply/cmd.go b/pkg/storage/apply/cmd.go new file mode 100644 index 000000000000..24c1b04b28e0 --- /dev/null +++ b/pkg/storage/apply/cmd.go @@ -0,0 +1,204 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package apply + +// Command is a command that has been successfully replicated through raft +// by being durably committed to the raft log of a quorum of peers in a raft +// group. +type Command interface { + // Index is the log index of the corresponding raft entry. + Index() uint64 + // IsTrivial returns whether the command can apply in a batch. + IsTrivial() bool + // IsLocal returns whether the command was locally proposed. + IsLocal() bool +} + +// CheckedCommand is a command that has been checked to see whether it can +// apply successfully or not. Committing an entry in a raft log and having +// the command in that entry succeed are similar but not equivalent concepts. +// A successfully committed entry may contain a command that the replicated +// state machine decides to reject (deterministically). +type CheckedCommand interface { + Command + // Rejected returns whether the command was rejected. + Rejected() bool +} + +// AppliedCommand is a command that has been applied to the replicated state +// machine. A command is considered "applied" if it has been staged in a +// Batch which has been committed and had its side-effects run on the state +// machine. If the command was rejected (see CheckedCommand), applying the +// command will likely be a no-op, but that is up to the implementation of +// the state machine. +type AppliedCommand interface { + CheckedCommand + // AckOutcomeAndFinish acknowledges the outcome of the command to its + // client. It also signals that the application of the command has + // completed. + AckOutcomeAndFinish() error +} + +// CommandIteratorBase is a common interface extended by all iterator and +// list variants. It is exported so its methods are displayed in godoc when +// it is embedded in other interfaces. +type CommandIteratorBase interface { + // Valid returns whether the iterator is pointing at a valid element. + Valid() bool + // Next advances the iterator. Should not be called if valid is false. + Next() + // NewList returns a new empty command list. Usages of the list will + // always advance the iterator before pushing into to the list, so + // implementors are free to share backing memory between the two. + NewList() CommandList + // NewCheckedList returns a new empty checked command list. Usages + // of the list will always advance the iterator before pushing into + // to the list, so implementors are free to share backing memory + // between the two. + NewCheckedList() CheckedCommandList + // NewAppliedList returns a new empty applied command list. Usages + // of the list will always advance the iterator before pushing into + // to the list, so implementors are free to share backing memory + // between the two. + NewAppliedList() AppliedCommandList + // Close closes the iterator. Once closed, it must not be used. + Close() +} + +// CommandIterator is an iterator over replicated commands. +type CommandIterator interface { + CommandIteratorBase + // cur returns the command that the iterator is currently pointing at. + // Should not be called if valid is false. + Cur() Command +} + +// CommandList is a list of replicated commands. +type CommandList interface { + CommandIterator + // Append pushes the command on to the back of the list. + Append(Command) +} + +// CheckedCommandIterator is an iterator over checked replicated +// commands. +type CheckedCommandIterator interface { + CommandIteratorBase + // cur returns the checked command that the iterator is currently + // pointing at. Should not be called if valid is false. + CurChecked() CheckedCommand +} + +// CheckedCommandList is a list of checked replicated commands. +type CheckedCommandList interface { + CheckedCommandIterator + // AppendChecked pushes the checked command on to the back of the list. + AppendChecked(CheckedCommand) +} + +// AppliedCommandIterator is an iterator over applied replicated commands. +type AppliedCommandIterator interface { + CommandIteratorBase + // cur returns the applied command that the iterator is currently + // pointing at. Should not be called if valid is false. + CurApplied() AppliedCommand +} + +// AppliedCommandList is a list of applied replicated commands. +type AppliedCommandList interface { + AppliedCommandIterator + // AppendApplied pushes the applied command on to the back of the list. + AppendApplied(AppliedCommand) +} + +// takeWhileCmdIter returns an iterator that yields commands based on a +// predicate. It will call the predicate on each command in the provided +// iterator and yield elements while it returns true. The function does +// NOT close the provided iterator, but does drain it of any commands +// that are moved to the returned iterator. +func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIterator { + ret := iter.NewList() + for iter.Valid() { + cmd := iter.Cur() + if !pred(cmd) { + break + } + iter.Next() + ret.Append(cmd) + } + return ret +} + +// mapCmdIter returns an iterator that contains the result of each command +// from the provided iterator transformed by a closure. The closure is +// responsible for converting Commands into CheckedCommand. The function +// closes the provided iterator. +func mapCmdIter( + iter CommandIterator, fn func(Command) (CheckedCommand, error), +) (CheckedCommandIterator, error) { + defer iter.Close() + ret := iter.NewCheckedList() + for iter.Valid() { + checked, err := fn(iter.Cur()) + if err != nil { + return nil, err + } + iter.Next() + ret.AppendChecked(checked) + } + return ret, nil +} + +// mapCheckedCmdIter returns an iterator that contains the result of each +// command from the provided iterator transformed by a closure. The closure +// is responsible for converting CheckedCommand into AppliedCommand. The +// function closes the provided iterator. +func mapCheckedCmdIter( + iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error), +) (AppliedCommandIterator, error) { + defer iter.Close() + ret := iter.NewAppliedList() + for iter.Valid() { + applied, err := fn(iter.CurChecked()) + if err != nil { + return nil, err + } + iter.Next() + ret.AppendApplied(applied) + } + return ret, nil +} + +// forEachCheckedCmdIter calls a closure on each command in the provided +// iterator. The function closes the provided iterator. +func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error { + defer iter.Close() + for iter.Valid() { + if err := fn(iter.CurChecked()); err != nil { + return err + } + iter.Next() + } + return nil +} + +// forEachAppliedCmdIter calls a closure on each command in the provided +// iterator. The function closes the provided iterator. +func forEachAppliedCmdIter(iter AppliedCommandIterator, fn func(AppliedCommand) error) error { + defer iter.Close() + for iter.Valid() { + if err := fn(iter.CurApplied()); err != nil { + return err + } + iter.Next() + } + return nil +} diff --git a/pkg/storage/apply/task.go b/pkg/storage/apply/task.go new file mode 100644 index 000000000000..68513925cf60 --- /dev/null +++ b/pkg/storage/apply/task.go @@ -0,0 +1,219 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package apply provides abstractions and routines associated with the +// application of committed raft entries to a replicated state machine. +package apply + +import ( + "context" + + "go.etcd.io/etcd/raft/raftpb" +) + +// StateMachine represents an instance of a replicated state machine being +// driven by a replication group. The state machine accepts Commands that +// have been committed to the replication group's log and applies them to +// advance to a new state. +// +// All state transitions performed by the state machine are expected to be +// deterministic, which ensures that if each instance is driven from the +// same consistent shared log, they will all stay in sync. +type StateMachine interface { + // NewBatch creates a new batch that is suitable for accumulating the + // effects that a group of Commands will have on the replicated state + // machine. Commands are staged in the batch one-by-one and then the + // entire batch is committed at once. + NewBatch() Batch + // ApplySideEffects applies the in-memory side-effects of a Command to + // the replicated state machine. The method will be called in the order + // that the commands are committed to the state machine's log. It will + // always be called with a Command that has been checked and whose Batch + // has already been committed. + ApplySideEffects(CheckedCommand) (AppliedCommand, error) +} + +// Batch accumulates a series of updates from Commands and applies them all +// at once to its StateMachine when committed. Groups of Commands will be +// staged in the Batch such that one or more trivial Commands are staged or +// exactly one non-trivial Command is staged. +type Batch interface { + // Stage inserts a Command into the Batch. + Stage(Command) (CheckedCommand, error) + // Commit commits the updates staged in the Batch to the StateMachine. + Commit(context.Context) error + // Close closes the batch and releases any resources that it holds. + Close() +} + +// Decoder is capable of decoding a list of committed raft entries and +// binding any that were locally proposed to their local proposals. +type Decoder interface { + // DecodeAndBind decodes each of the provided raft entries into commands + // and binds any that were proposed locally to their local proposals. + // The method must only be called once per applier. It returns whether + // any of the commands were bound to local proposals waiting for + // acknowledgement. + DecodeAndBind(context.Context, []raftpb.Entry) (anyLocal bool, _ error) + // NewCommandIter creates an iterator over the replicated commands that + // were passed to DecodeAndBind. The method must not be called until + // after DecodeAndBind is called. + NewCommandIter() CommandIterator + // Reset resets the Decoder and releases any resources that it holds. + Reset() +} + +// Task is an object capable of coordinating the application of commands to +// a replicated state machine after they have been durably committed to a +// raft log. +// +// Committed raft entries are provided to the task through its Decode +// method. The task will then apply these entries to the provided state +// machine when ApplyCommittedEntries is called. +// +// Example use: +// +// sm := getStateMachine() +// dec := newDecoder() +// +// t := apply.MakeTask(sm, dec) +// defer t.Close() +// if err := t.Decode(ctx, ents); err != nil { +// return err +// } +// if err := t.ApplyCommittedEntries(ctx); err != nil { +// return err +// } +// +type Task struct { + sm StateMachine + dec Decoder + + // Have entries been decoded yet? + decoded bool + // Were any of the decoded commands locally proposed? + anyLocal bool + // The maximum number of commands that can be applied in a batch. + batchSize int32 +} + +// MakeTask creates a new task with the provided state machine and decoder. +func MakeTask(sm StateMachine, dec Decoder) Task { + return Task{sm: sm, dec: dec} +} + +// Decode decodes the committed raft entries into commands and prepared for the +// commands to be applied to the replicated state machine. +func (t *Task) Decode(ctx context.Context, committedEntries []raftpb.Entry) error { + var err error + t.anyLocal, err = t.dec.DecodeAndBind(ctx, committedEntries) + t.decoded = true + return err +} + +func (t *Task) assertDecoded() { + if !t.decoded { + panic("Task.Decode not called yet") + } +} + +// SetMaxBatchSize sets the maximum application batch size. If 0, no limit +// will be placed on the number of commands that can be applied in a batch. +func (t *Task) SetMaxBatchSize(size int) { + t.batchSize = int32(size) +} + +// ApplyCommittedEntries applies raft entries that have been committed to the +// raft log but have not yet been applied to the replicated state machine. +func (t *Task) ApplyCommittedEntries(ctx context.Context) error { + t.assertDecoded() + + iter := t.dec.NewCommandIter() + defer iter.Close() + for iter.Valid() { + if err := t.applyOneBatch(ctx, iter); err != nil { + return err + } + } + return nil +} + +// applyOneBatch consumes a batch-worth of commands from the provided iter and +// applies them atomically using the applier. A batch will contain either: +// a) one or more trivial commands +// b) exactly one non-trivial command +func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error { + // Create a new application batch. + batch := t.sm.NewBatch() + defer batch.Close() + + // Consume a batch-worth of commands. + pol := trivialPolicy{maxCount: t.batchSize} + batchIter := takeWhileCmdIter(iter, func(cmd Command) bool { + return pol.maybeAdd(cmd.IsTrivial()) + }) + + // Stage each command in the batch. + stagedIter, err := mapCmdIter(batchIter, batch.Stage) + if err != nil { + return err + } + + // Commit the batch to the storage engine. + if err := batch.Commit(ctx); err != nil { + return err + } + + // Apply the side-effects of each command. + appliedIter, err := mapCheckedCmdIter(stagedIter, t.sm.ApplySideEffects) + if err != nil { + return err + } + + // Acknowledge the outcome of each command. + return forEachAppliedCmdIter(appliedIter, AppliedCommand.AckOutcomeAndFinish) +} + +// trivialPolicy encodes a batching policy that allows a batch to consist of +// either one or more trivial commands or exactly one non-trivial command. +type trivialPolicy struct { + maxCount int32 + + trivialCount int32 + nonTrivialCount int32 +} + +// maybeAdd returns whether a command with the specified triviality should be +// added to a batch given the batching policy. If the method returns true, the +// command is considered to have been added. +func (p *trivialPolicy) maybeAdd(trivial bool) bool { + if !trivial { + if p.trivialCount+p.nonTrivialCount > 0 { + return false + } + p.nonTrivialCount++ + return true + } + if p.nonTrivialCount > 0 { + return false + } + if p.maxCount > 0 && p.maxCount == p.trivialCount { + return false + } + p.trivialCount++ + return true +} + +// Close ends the task, releasing any resources that it holds and resetting the +// Decoder. The Task cannot be used again after being closed. +func (t *Task) Close() { + t.dec.Reset() + *t = Task{} +} diff --git a/pkg/storage/apply/task_test.go b/pkg/storage/apply/task_test.go new file mode 100644 index 000000000000..f0def17543be --- /dev/null +++ b/pkg/storage/apply/task_test.go @@ -0,0 +1,218 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package apply_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/raftpb" +) + +type cmd struct { + index uint64 + trivial bool + local bool + + shouldReject bool + acked bool + finished bool +} + +type checkedCmd struct { + *cmd + rejected bool +} + +type appliedCmd struct { + *checkedCmd +} + +func (c *cmd) Index() uint64 { return c.index } +func (c *cmd) IsTrivial() bool { return c.trivial } +func (c *cmd) IsLocal() bool { return c.local } +func (c *checkedCmd) Rejected() bool { return c.rejected } +func (c *checkedCmd) CanAckBeforeApplication() bool { return false } +func (c *checkedCmd) AckSuccess() error { + c.acked = true + return nil +} +func (c *appliedCmd) AckOutcomeAndFinish() error { + c.acked = true + c.finished = true + return nil +} + +type cmdSlice []cmd +type checkedCmdSlice []checkedCmd +type appliedCmdSlice []appliedCmd + +func (s *cmdSlice) Valid() bool { return len(*s) > 0 } +func (s *cmdSlice) Next() { *s = (*s)[1:] } +func (s *cmdSlice) NewList() apply.CommandList { return new(cmdSlice) } +func (s *cmdSlice) NewCheckedList() apply.CheckedCommandList { return new(checkedCmdSlice) } +func (s *cmdSlice) NewAppliedList() apply.AppliedCommandList { return new(appliedCmdSlice) } +func (s *cmdSlice) Close() {} +func (s *cmdSlice) Cur() apply.Command { return &(*s)[0] } +func (s *cmdSlice) Append(c apply.Command) { *s = append(*s, *c.(*cmd)) } + +func (s *checkedCmdSlice) Valid() bool { return len(*s) > 0 } +func (s *checkedCmdSlice) Next() { *s = (*s)[1:] } +func (s *checkedCmdSlice) NewList() apply.CommandList { return new(cmdSlice) } +func (s *checkedCmdSlice) NewCheckedList() apply.CheckedCommandList { return new(checkedCmdSlice) } +func (s *checkedCmdSlice) NewAppliedList() apply.AppliedCommandList { return new(appliedCmdSlice) } +func (s *checkedCmdSlice) Close() {} +func (s *checkedCmdSlice) CurChecked() apply.CheckedCommand { return &(*s)[0] } +func (s *checkedCmdSlice) AppendChecked(c apply.CheckedCommand) { *s = append(*s, *c.(*checkedCmd)) } + +func (s *appliedCmdSlice) Valid() bool { return len(*s) > 0 } +func (s *appliedCmdSlice) Next() { *s = (*s)[1:] } +func (s *appliedCmdSlice) NewList() apply.CommandList { return new(cmdSlice) } +func (s *appliedCmdSlice) NewCheckedList() apply.CheckedCommandList { return new(checkedCmdSlice) } +func (s *appliedCmdSlice) NewAppliedList() apply.AppliedCommandList { return new(appliedCmdSlice) } +func (s *appliedCmdSlice) Close() {} +func (s *appliedCmdSlice) CurApplied() apply.AppliedCommand { return &(*s)[0] } +func (s *appliedCmdSlice) AppendApplied(c apply.AppliedCommand) { *s = append(*s, *c.(*appliedCmd)) } + +var _ apply.Command = &cmd{} +var _ apply.CheckedCommand = &checkedCmd{} +var _ apply.AppliedCommand = &appliedCmd{} +var _ apply.CommandList = &cmdSlice{} +var _ apply.CheckedCommandList = &checkedCmdSlice{} +var _ apply.AppliedCommandList = &appliedCmdSlice{} + +type testingStateMachine struct { + batches [][]uint64 + applied []uint64 + appliedSideEffects []uint64 + batchOpen bool +} + +func (sm *testingStateMachine) NewBatch(mock bool) apply.Batch { + if sm.batchOpen { + panic("batch not closed") + } + sm.batchOpen = true + return &testingBatch{sm: sm, mock: mock} +} +func (sm *testingStateMachine) ApplySideEffects( + cmdI apply.CheckedCommand, +) (apply.AppliedCommand, error) { + cmd := cmdI.(*checkedCmd) + sm.appliedSideEffects = append(sm.appliedSideEffects, cmd.index) + acmd := appliedCmd{checkedCmd: cmd} + return &acmd, nil +} + +type testingBatch struct { + sm *testingStateMachine + mock bool + staged []uint64 +} + +func (b *testingBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { + cmd := cmdI.(*cmd) + b.staged = append(b.staged, cmd.index) + ccmd := checkedCmd{cmd: cmd, rejected: cmd.shouldReject} + return &ccmd, nil +} +func (b *testingBatch) Commit(_ context.Context) error { + if b.mock { + return errors.New("can't commit a mock batch") + } + b.sm.batches = append(b.sm.batches, b.staged) + b.sm.applied = append(b.sm.applied, b.staged...) + return nil +} +func (b *testingBatch) Close() { + b.sm.batchOpen = false +} + +type testingDecoder struct{ cmds []cmd } + +func (d *testingDecoder) DecodeAndBind(_ context.Context, _ []raftpb.Entry) (bool, error) { + return true, nil +} +func (d *testingDecoder) NewCommandIter() apply.CommandIterator { + return (*cmdSlice)(&d.cmds) +} +func (d *testingDecoder) Reset() {} + +func TestApplyCommittedEntries(t *testing.T) { + ctx := context.Background() + sm := testingStateMachine{} + dec := testingDecoder{cmds: []cmd{ + {index: 1, trivial: true, local: true, shouldReject: false}, + {index: 2, trivial: true, local: true, shouldReject: false}, + {index: 3, trivial: false, local: true, shouldReject: false}, + {index: 4, trivial: false, local: true, shouldReject: false}, + {index: 5, trivial: true, local: true, shouldReject: false}, + {index: 6, trivial: false, local: true, shouldReject: false}, + }} + + // Use an apply.Task to apply all commands. + appT := apply.MakeTask(&sm, &dec) + defer appT.Close() + require.NoError(t, appT.Decode(ctx, nil /* ents */)) + require.NoError(t, appT.ApplyCommittedEntries(ctx)) + + // Assert that all commands were applied in the correct batches. + exp := testingStateMachine{ + batches: [][]uint64{{1, 2}, {3}, {4}, {5}, {6}}, + applied: []uint64{1, 2, 3, 4, 5, 6}, + appliedSideEffects: []uint64{1, 2, 3, 4, 5, 6}, + } + require.Equal(t, exp, sm) + + // Assert that all commands were acknowledged and finished. + for _, cmd := range dec.cmds { + require.True(t, cmd.acked) + require.True(t, cmd.finished) + } +} + +func TestApplyCommittedEntriesWithBatchSize(t *testing.T) { + ctx := context.Background() + sm := testingStateMachine{} + dec := testingDecoder{cmds: []cmd{ + {index: 1, trivial: true, local: true, shouldReject: false}, + {index: 2, trivial: true, local: true, shouldReject: false}, + {index: 3, trivial: true, local: true, shouldReject: false}, + {index: 4, trivial: false, local: true, shouldReject: false}, + {index: 5, trivial: true, local: true, shouldReject: false}, + {index: 6, trivial: true, local: true, shouldReject: false}, + {index: 7, trivial: true, local: true, shouldReject: false}, + }} + + // Use an apply.Task to apply all commands with a batch size limit. + appT := apply.MakeTask(&sm, &dec) + appT.SetMaxBatchSize(2) + defer appT.Close() + require.NoError(t, appT.Decode(ctx, nil /* ents */)) + require.NoError(t, appT.ApplyCommittedEntries(ctx)) + + // Assert that all commands were applied in the correct batches. + exp := testingStateMachine{ + batches: [][]uint64{{1, 2}, {3}, {4}, {5, 6}, {7}}, + applied: []uint64{1, 2, 3, 4, 5, 6, 7}, + appliedSideEffects: []uint64{1, 2, 3, 4, 5, 6, 7}, + } + require.Equal(t, exp, sm) + + // Assert that all commands were acknowledged and finished. + for _, cmd := range dec.cmds { + require.True(t, cmd.acked) + require.True(t, cmd.finished) + } +} diff --git a/pkg/storage/cmd_app_batch.go b/pkg/storage/cmd_app_batch.go deleted file mode 100644 index 54cb3b695c66..000000000000 --- a/pkg/storage/cmd_app_batch.go +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2019 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package storage - -import ( - "context" - "sync" - - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/storagepb" - "go.etcd.io/etcd/raft/raftpb" -) - -// cmdAppBatch accumulates state due to the application of raft -// commands. Committed raft commands are applied to the batch in a multi-stage -// process whereby individual commands are decoded, prepared for application -// relative to the current view of replicaState, committed to the storage -// engine, applied to the Replica's in-memory state and then finished by -// releasing their latches and notifying clients. -type cmdAppBatch struct { - // decodeBuf is used to decode an entry before adding it to the batch. - // See decode(). - decodeBuf decodedRaftEntry - - // batch accumulates writes implied by the raft entries in this batch. - batch engine.Batch - - // replicaState is this batch's view of the replica's state. - // It is copied from under the Replica.mu when the batch is initialized and - // is updated in stageTrivialReplicatedEvalResult. - replicaState storagepb.ReplicaState - - // stats is stored on the application batch to avoid an allocation in tracking - // the batch's view of replicaState. All pointer fields in replicaState other - // than Stats are overwritten completely rather than updated in-place. - stats enginepb.MVCCStats - - // updatedTruncatedState tracks whether any command in the batch updated the - // replica's truncated state. Truncated state updates are considered trivial - // but represent something of a special case but given their relative - // frequency and the fact that multiple updates can be trivially coalesced, we - // treat updates to truncated state as trivial. If the batch updated the - // truncated state then after it has been committed, then the side-loaded data - // and raftentry.Cache should be truncated to the index indicated. - // TODO(ajwerner): consider whether this logic should imply that commands - // which update truncated state are non-trivial. - updatedTruncatedState bool - - cmdBuf cmdAppCtxBuf -} - -// cmdAppBatch structs are needed to apply raft commands, which is to -// say, frequently, so best to pool them rather than allocated under the raftMu. -var cmdAppBatchSyncPool = sync.Pool{ - New: func() interface{} { - return new(cmdAppBatch) - }, -} - -func getCmdAppBatch() *cmdAppBatch { - return cmdAppBatchSyncPool.Get().(*cmdAppBatch) -} - -func releaseCmdAppBatch(b *cmdAppBatch) { - b.cmdBuf.clear() - *b = cmdAppBatch{} - cmdAppBatchSyncPool.Put(b) -} - -// add adds adds the entry and its decoded state to the end of the batch. -func (b *cmdAppBatch) add(e *raftpb.Entry, d decodedRaftEntry) { - s := b.cmdBuf.allocate() - s.decodedRaftEntry = d - s.e = e -} - -// decode decodes commands from toProcess until either all of the commands have -// been added to the batch or a non-trivial command is decoded. Non-trivial -// commands must always be in their own batch. If a non-trivial command is -// encountered the batch is returned immediately without adding the newly -// decoded command to the batch or removing it from remaining. -// It is the client's responsibility to deal with non-trivial commands. -// -// numEmptyEntries indicates the number of entries in the consumed portion of -// toProcess contained a zero-byte payload. -func (b *cmdAppBatch) decode( - ctx context.Context, toProcess []raftpb.Entry, decodeBuf *decodedRaftEntry, maxEntries int, -) ( - foundNonTrivialEntry bool, - numEmptyEntries int, - remaining []raftpb.Entry, - errExpl string, - err error, -) { - for len(toProcess) > 0 && (maxEntries <= 0 || int(b.cmdBuf.len) < maxEntries) { - e := &toProcess[0] - if len(e.Data) == 0 { - numEmptyEntries++ - } - if errExpl, err = decodeBuf.decode(ctx, e); err != nil { - return false, numEmptyEntries, nil, errExpl, err - } - // This is a non-trivial entry which needs to be processed alone. - foundNonTrivialEntry = !isTrivial(decodeBuf.replicatedResult(), - b.replicaState.UsingAppliedStateKey) - if foundNonTrivialEntry { - break - } - // We're going to process this entry in this batch so pop it from toProcess - // and add to appStates. - toProcess = toProcess[1:] - b.add(e, *decodeBuf) - } - return foundNonTrivialEntry, numEmptyEntries, toProcess, "", nil -} - -func (b *cmdAppBatch) reset() { - b.cmdBuf.clear() - *b = cmdAppBatch{ - decodeBuf: b.decodeBuf, // preserve the previously decoded entry - } -} diff --git a/pkg/storage/cmd_app_ctx.go b/pkg/storage/cmd_app_ctx.go index 5df654506d41..16c01ab5b040 100644 --- a/pkg/storage/cmd_app_ctx.go +++ b/pkg/storage/cmd_app_ctx.go @@ -23,19 +23,17 @@ import ( "go.etcd.io/etcd/raft/raftpb" ) -// cmdAppCtx stores the state required to apply a single raft -// entry to a replica. The state is accumulated in stages which occur in -// Replica.handleCommittedEntriesRaftMuLocked. From a high level, a command is -// decoded into an entryApplicationBatch, then if it was proposed locally the -// proposal is populated from the replica's proposals map, then the command -// is staged into the batch by writing its update to the batch's engine.Batch -// and applying its "trivial" side-effects to the batch's view of ReplicaState. -// Then the batch is committed, the side-effects are applied and the local -// result is processed. +// cmdAppCtx stores the state required to apply a single raft entry to a +// replica. The state is accumulated in stages which occur in apply.Task. From +// a high level, the command is decoded from a committed raft entry, then if it +// was proposed locally the proposal is populated from the replica's proposals +// map, then the command is staged into a replicaAppBatch by writing its update +// to the batch's engine.Batch and applying its "trivial" side-effects to the +// batch's view of ReplicaState. Then the batch is committed, the side-effects +// are applied and the local result is processed. type cmdAppCtx struct { - // e is the Entry being applied. - e *raftpb.Entry - decodedRaftEntry // decoded from e. + ent *raftpb.Entry // the raft.Entry being applied + decodedRaftEntry // decoded from ent // proposal is populated on the proposing Replica only and comes from the // Replica's proposal map. @@ -44,25 +42,24 @@ type cmdAppCtx struct { // be populated with the handleCommittedEntries ctx. ctx context.Context - // The below fields are set during stageRaftCommand when we validate that - // a command applies given the current lease in checkForcedErr. + // The following fields are set in checkShouldApplyCommand when we validate + // that a command applies given the current lease and GC threshold. They are + // set when transforming an apply.Command into an apply.CheckedCommand. leaseIndex uint64 forcedErr *roachpb.Error proposalRetry proposalReevaluationReason - mutationCount int // number of mutations in the WriteBatch, for writeStats - // splitMergeUnlock is acquired for splits and merges. + // splitMergeUnlock is acquired for splits and merges when they are staged + // in the application batch and called after the command's side effects + // are applied. splitMergeUnlock func() - // The below fields are set after the data has been written to the storage - // engine in prepareLocalResult. + // The following fields are set after the data has been written to the + // storage engine in prepareLocalResult. They are set when transforming + // an apply.CheckedCommand into an apply.AppliedCommand. localResult *result.LocalResult response proposalResult } -func (cmd *cmdAppCtx) proposedLocally() bool { - return cmd.proposal != nil -} - // decodedRaftEntry represents the deserialized content of a raftpb.Entry. type decodedRaftEntry struct { idKey storagebase.CmdIDKey @@ -76,8 +73,10 @@ type decodedConfChange struct { ccCtx ConfChangeContext } -func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { - return &d.raftCmd.ReplicatedEvalResult +// decode decodes the entry e into the cmdAppCtx. +func (c *cmdAppCtx) decode(ctx context.Context, e *raftpb.Entry) (errExpl string, err error) { + c.ent = e + return c.decodedRaftEntry.decode(ctx, e) } // decode decodes the entry e into the decodedRaftEntry. @@ -132,3 +131,36 @@ func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) (errExpl strin d.idKey = storagebase.CmdIDKey(d.ccCtx.CommandID) return "", nil } + +func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { + return &d.raftCmd.ReplicatedEvalResult +} + +// Index implements the apply.Command interface. +func (c *cmdAppCtx) Index() uint64 { + return c.ent.Index +} + +// IsTrivial implements the apply.Command interface. +func (c *cmdAppCtx) IsTrivial() bool { + return isTrivial(c.replicatedResult()) +} + +// IsLocal implements the apply.Command interface. +func (c *cmdAppCtx) IsLocal() bool { + return c.proposal != nil +} + +// Rejected implements the apply.CheckedCommand interface. +func (c *cmdAppCtx) Rejected() bool { + return c.forcedErr != nil +} + +// AckOutcomeAndFinish implements the apply.AppliedCommand interface. +func (c *cmdAppCtx) AckOutcomeAndFinish() error { + if !c.IsLocal() { + return nil + } + c.proposal.finishApplication(c.response) + return nil +} diff --git a/pkg/storage/cmd_app_ctx_buf.go b/pkg/storage/cmd_app_ctx_buf.go index c16640da4f5e..5b3f5756959f 100644 --- a/pkg/storage/cmd_app_ctx_buf.go +++ b/pkg/storage/cmd_app_ctx_buf.go @@ -10,13 +10,21 @@ package storage -import "sync" +import ( + "sync" -// cmdAppCtxBufNodeSize is the size of the arrays in an -// cmdAppStateBufNode. + "github.com/cockroachdb/cockroach/pkg/storage/apply" +) + +// cmdAppCtxBufNodeSize is the size of the arrays in an cmdAppStateBufNode. // TODO(ajwerner): justify this number. const cmdAppCtxBufNodeSize = 8 +// cmdAppCtxBufSliceFreeListSize is the size of the cmdAppCtxBufSlice free +// list. The size has been tuned for the maximum number of iterators that +// the storage/apply package uses at once. +const cmdAppCtxBufSliceFreeListSize = 3 + // cmdAppCtxBuf is an allocation-efficient buffer used during the // application of raft entries. Initialization occurs lazily upon the first // call to allocate but used cmdAppCtxBuf objects should be released @@ -25,10 +33,10 @@ const cmdAppCtxBufNodeSize = 8 type cmdAppCtxBuf struct { len int32 head, tail *cmdAppCtxBufNode + free cmdAppCtxBufSliceFreeList } -// cmdAppCtxBufNode is a linked-list element in an -// cmdAppStateBuf. +// cmdAppCtxBufNode is a linked-list element in an cmdAppStateBuf. type cmdAppCtxBufNode struct { len int32 buf [cmdAppCtxBufNodeSize]cmdAppCtx @@ -72,45 +80,133 @@ func (buf *cmdAppCtxBuf) clear() { *buf = cmdAppCtxBuf{} } -// last returns a pointer to the last element in the buffer. -func (buf *cmdAppCtxBuf) last() *cmdAppCtx { - return &buf.tail.buf[buf.tail.len-1] +// newIter returns a pointer to a new uninitialized iterator. The iterator +// should be closed when no longer in use. +func (buf *cmdAppCtxBuf) newIter() *cmdAppCtxBufSlice { + return buf.free.get() } -// cmdAppCtxBufIterator iterates through the entries in an -// cmdAppStateBuf. -type cmdAppCtxBufIterator struct { +// cmdAppCtxBufPtr is a pointer into a cmdAppCtxBuf. +type cmdAppCtxBufPtr struct { idx int32 buf *cmdAppCtxBuf node *cmdAppCtxBufNode } -// init seeks the iterator to the front of buf. It returns true if buf is not -// empty and false if it is. -func (it *cmdAppCtxBufIterator) init(buf *cmdAppCtxBuf) (ok bool) { - *it = cmdAppCtxBufIterator{buf: buf, node: buf.head} - return it.buf.len > 0 +func (ptr *cmdAppCtxBufPtr) valid() bool { + return ptr.idx < ptr.buf.len +} + +func (ptr *cmdAppCtxBufPtr) cur() *cmdAppCtx { + return &ptr.node.buf[ptr.idx%cmdAppCtxBufNodeSize] +} + +func (ptr *cmdAppCtxBufPtr) next() { + ptr.idx++ + if !ptr.valid() { + return + } + if ptr.idx%cmdAppCtxBufNodeSize == 0 { + ptr.node = ptr.node.next + } +} + +// cmdAppCtxBufSlice iterates through the entries in an cmdAppStateBuf. +type cmdAppCtxBufSlice struct { + head, tail cmdAppCtxBufPtr +} + +// init initializes the slice over the entries cmdAppCtxBuf. +func (it *cmdAppCtxBufSlice) init(buf *cmdAppCtxBuf) { + *it = cmdAppCtxBufSlice{ + head: cmdAppCtxBufPtr{idx: 0, buf: buf, node: buf.head}, + tail: cmdAppCtxBufPtr{idx: buf.len, buf: buf, node: buf.tail}, + } +} + +// initEmpty initializes the slice with a length of 0 and pointing at +// the head of the cmdAppCtxBuf. +func (it *cmdAppCtxBufSlice) initEmpty(buf *cmdAppCtxBuf) { + *it = cmdAppCtxBufSlice{ + head: cmdAppCtxBufPtr{idx: 0, buf: buf, node: buf.head}, + tail: cmdAppCtxBufPtr{idx: 0, buf: buf, node: buf.head}, + } +} + +// len returns the length of the slice. +func (it *cmdAppCtxBufSlice) len() int { + return int(it.tail.idx - it.head.idx) } -// cur returns the cmdAppState currently pointed to by it. -func (it *cmdAppCtxBufIterator) cur() *cmdAppCtx { - return &it.node.buf[it.idx%cmdAppCtxBufNodeSize] +// Valid implements the apply.commandIteratorBase interface. +func (it *cmdAppCtxBufSlice) Valid() bool { + return it.len() > 0 } -// isLast returns true if it currently points to the last element in the buffer. -func (it *cmdAppCtxBufIterator) isLast() bool { - return it.idx+1 == it.buf.len +// Next implements the apply.commandIteratorBase interface. +func (it *cmdAppCtxBufSlice) Next() { + it.head.next() } -// next moves it to point to the next element. It returns false if it.isLast() -// is true, indicating that there are no more elements in the buffer. -func (it *cmdAppCtxBufIterator) next() bool { - if it.isLast() { - return false +// cur and its variants implement the apply.{Checked,Applied}CommandIterator interface. +func (it *cmdAppCtxBufSlice) cur() *cmdAppCtx { return it.head.cur() } +func (it *cmdAppCtxBufSlice) Cur() apply.Command { return it.head.cur() } +func (it *cmdAppCtxBufSlice) CurChecked() apply.CheckedCommand { return it.head.cur() } +func (it *cmdAppCtxBufSlice) CurApplied() apply.AppliedCommand { return it.head.cur() } + +// append and its variants implement the apply.{Checked,Applied}CommandList interface. +func (it *cmdAppCtxBufSlice) append(cmd *cmdAppCtx) { + cur := it.tail.cur() + if cur == cmd { + // Avoid the copy. + } else { + *cur = *cmd } - it.idx++ - if it.idx%cmdAppCtxBufNodeSize == 0 { - it.node = it.node.next + it.tail.next() +} +func (it *cmdAppCtxBufSlice) Append(cmd apply.Command) { it.append(cmd.(*cmdAppCtx)) } +func (it *cmdAppCtxBufSlice) AppendChecked(cmd apply.CheckedCommand) { it.append(cmd.(*cmdAppCtx)) } +func (it *cmdAppCtxBufSlice) AppendApplied(cmd apply.AppliedCommand) { it.append(cmd.(*cmdAppCtx)) } + +// newList and its variants implement the apply.commandIteratorBase interface. +func (it *cmdAppCtxBufSlice) newList() *cmdAppCtxBufSlice { + it2 := it.head.buf.newIter() + it2.initEmpty(it.head.buf) + return it2 +} +func (it *cmdAppCtxBufSlice) NewList() apply.CommandList { return it.newList() } +func (it *cmdAppCtxBufSlice) NewCheckedList() apply.CheckedCommandList { return it.newList() } +func (it *cmdAppCtxBufSlice) NewAppliedList() apply.AppliedCommandList { return it.newList() } + +// Close implements the apply.commandIteratorBase interface. +func (it *cmdAppCtxBufSlice) Close() { + it.head.buf.free.put(it) +} + +// cmdAppCtxBufSliceFreeList is a free list of cmdAppCtxBufSlice objects that is +// used to avoid memory allocations for short-lived cmdAppCtxBufSlice objects +// that require heap allocation. +type cmdAppCtxBufSliceFreeList struct { + iters [cmdAppCtxBufSliceFreeListSize]cmdAppCtxBufSlice + inUse [cmdAppCtxBufSliceFreeListSize]bool +} + +func (f *cmdAppCtxBufSliceFreeList) put(it *cmdAppCtxBufSlice) { + *it = cmdAppCtxBufSlice{} + for i := range f.iters { + if &f.iters[i] == it { + f.inUse[i] = false + return + } + } +} + +func (f *cmdAppCtxBufSliceFreeList) get() *cmdAppCtxBufSlice { + for i, inUse := range f.inUse { + if !inUse { + f.inUse[i] = true + return &f.iters[i] + } } - return true + return new(cmdAppCtxBufSlice) } diff --git a/pkg/storage/cmd_app_ctx_buf_test.go b/pkg/storage/cmd_app_ctx_buf_test.go index 99979f35271c..b603a11df92e 100644 --- a/pkg/storage/cmd_app_ctx_buf_test.go +++ b/pkg/storage/cmd_app_ctx_buf_test.go @@ -30,24 +30,20 @@ func TestApplicationStateBuf(t *testing.T) { states = append(states, buf.allocate()) assert.Equal(t, i+1, int(buf.len)) } - // Test that last returns the correct value. - last := states[len(states)-1] - assert.Equal(t, last, buf.last()) // Test the iterator. - var it cmdAppCtxBufIterator + var it cmdAppCtxBufSlice i := 0 - for ok := it.init(&buf); ok; ok = it.next() { + for it.init(&buf); it.Valid(); it.Next() { assert.Equal(t, states[i], it.cur()) i++ } assert.Equal(t, i, numStates) // make sure we saw them all - assert.True(t, it.isLast()) // Test clear. buf.clear() assert.EqualValues(t, buf, cmdAppCtxBuf{}) assert.Equal(t, 0, int(buf.len)) - assert.Panics(t, func() { buf.last() }) - assert.False(t, it.init(&buf)) + it.init(&buf) + assert.False(t, it.Valid()) // Test clear on an empty buffer. buf.clear() assert.EqualValues(t, buf, cmdAppCtxBuf{}) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 2a1009ec0caf..1dc3f060adaa 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -196,6 +196,10 @@ type Replica struct { stateLoader stateloader.StateLoader // on-disk storage for sideloaded SSTables. nil when there's no ReplicaID. sideloaded SideloadStorage + // decoder is used to decode committed raft entries. + decoder replicaDecoder + // applier is used to apply committed raft entries. + applier replicaApplier } // Contains the lease history when enabled. diff --git a/pkg/storage/replica_application.go b/pkg/storage/replica_application.go deleted file mode 100644 index c8ecc1d231a8..000000000000 --- a/pkg/storage/replica_application.go +++ /dev/null @@ -1,897 +0,0 @@ -// Copyright 2019 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package storage - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/stateloader" - "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/storagepb" - "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/kr/pretty" - "github.com/pkg/errors" - "go.etcd.io/etcd/raft/raftpb" -) - -// handleCommittedEntriesStats returns stats about what happened during the -// application of a set of raft entries. -// -// TODO(ajwerner): add metrics to go with these stats. -type handleCommittedEntriesStats struct { - batchesProcessed int - entriesProcessed int - stateAssertions int - numEmptyEntries int -} - -// handleCommittedEntriesRaftMuLocked deals with the complexities involved in -// moving the Replica's replicated state machine forward given committed raft -// entries. All changes to r.mu.state occur downstream of this call. -// -// The stats return value reflects the number of entries which are processed, -// the number of batches used to process them, the number of entries which -// contained no data (added by etcd raft), and the number of entries which -// required asserting the on-disk state. -// -// Errors returned from this method are fatal! The errExpl is provided as a -// non-sensitive cue of where the error occurred. -// TODO(ajwerner): replace errExpl with proper error composition using the new -// errors library. -// -// At a high level, this method receives committed entries which each contains -// the evaluation of a batch (at its heart a WriteBatch, to be applied to the -// underlying storage engine), which it decodes into batches of entries which -// are safe to apply atomically together. -// -// The pseudocode looks like: -// -// while entries: -// for each entry in entries: -// decode front into temp struct -// if entry is non-trivial and batch is not empty: -// break -// add decoded command to batch -// pop front from entries -// if entry is non-trivial: -// break -// prepare local commands -// for each entry in batch: -// check if failed -// if not: -// stage in batch -// commit batch to storage engine -// update Replica.state -// for each entry in batch: -// apply side-effects, ack client, release latches -// -// The processing of committed entries proceeds in 4 stages: decoding, -// local preparation, staging, and application. Commands may be applied together -// so long as their implied state change is "trivial" (see isTrivial). Once -// decoding has discovered a batch boundary, the commands are prepared by -// reading the current replica state from underneath the Replica.mu and -// determining whether any of the commands were proposed locally. Next each -// command is written to the engine.Batch and has the "trivial" component of -// its ReplicatedEvalResult applied to the batch's view of the ReplicaState. -// Finally the batch is written to the storage engine, its side effects on -// the Replica's state are applied, and the clients acked with the respective -// results. -func (r *Replica) handleCommittedEntriesRaftMuLocked( - ctx context.Context, committedEntries []raftpb.Entry, -) (stats handleCommittedEntriesStats, errExpl string, err error) { - var ( - haveNonTrivialEntry bool - toProcess = committedEntries - b = getCmdAppBatch() - ) - defer releaseCmdAppBatch(b) - for len(toProcess) > 0 { - if haveNonTrivialEntry { - // If the previous call to b.decode() informed us that it left a - // non-trivial entry in decodeBuf, use it. - haveNonTrivialEntry = false - b.add(&toProcess[0], b.decodeBuf) - toProcess = toProcess[1:] - } else { - // Decode zero or more trivial entries into b. - var numEmptyEntries int - haveNonTrivialEntry, numEmptyEntries, toProcess, errExpl, err = - b.decode(ctx, toProcess, &b.decodeBuf, r.store.TestingKnobs().MaxApplicationBatchSize) - if err != nil { - return stats, errExpl, err - } - // If no trivial entries were decoded go back around and process the - // non-trivial entry. - if b.cmdBuf.len == 0 { - continue - } - stats.numEmptyEntries += numEmptyEntries - } - r.retrieveLocalProposals(ctx, b) - b.batch = r.store.engine.NewBatch() - // Stage each of the commands which will write them into the newly created - // engine.Batch and update b's view of the replicaState. - var it cmdAppCtxBufIterator - for ok := it.init(&b.cmdBuf); ok; ok = it.next() { - cmd := it.cur() - r.stageRaftCommand(cmd.ctx, cmd, b.batch, &b.replicaState, - it.isLast() /* writeAppliedState */) - // We permit trivial commands to update the truncated state but we need to - // track whether that happens so that the side-effects of truncation occur - // after the batch is committed to storage. - updatedTruncatedState := stageTrivialReplicatedEvalResult(cmd.ctx, - cmd.replicatedResult(), cmd.e.Index, cmd.leaseIndex, &b.replicaState) - b.updatedTruncatedState = b.updatedTruncatedState || updatedTruncatedState - } - if errExpl, err = r.applyCmdAppBatch(ctx, b, &stats); err != nil { - return stats, errExpl, err - } - } - return stats, "", nil -} - -// retrieveLocalProposals populates the proposal and ctx fields for each of the -// commands in b. -func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) { - r.mu.Lock() - defer r.mu.Unlock() - b.replicaState = r.mu.state - // Copy stats as it gets updated in-place in applyRaftCommandToBatch. - b.replicaState.Stats = &b.stats - *b.replicaState.Stats = *r.mu.state.Stats - // Assign all the local proposals first then delete all of them from the map - // in a second pass. This ensures that we retrieve all proposals correctly - // even if the batch has multiple entries for the same proposal, in which - // case the proposal was reproposed (either under its original or a new - // MaxLeaseIndex) which we handle in a second pass below. - var anyLocal bool - var it cmdAppCtxBufIterator - for ok := it.init(&b.cmdBuf); ok; ok = it.next() { - cmd := it.cur() - cmd.proposal = r.mu.proposals[cmd.idKey] - if cmd.proposedLocally() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { - // If this entry does not have the most up-to-date view of the - // corresponding proposal's maximum lease index then the proposal - // must have been reproposed with a higher lease index. (see - // tryReproposeWithNewLeaseIndex). In that case, there's a newer - // version of the proposal in the pipeline, so don't consider this - // entry to have been proposed locally. The entry must necessarily be - // rejected by checkForcedErr. - cmd.proposal = nil - } - if cmd.proposedLocally() { - // We initiated this command, so use the caller-supplied context. - cmd.ctx = cmd.proposal.ctx - anyLocal = true - } else { - cmd.ctx = ctx - } - } - if !anyLocal && r.mu.proposalQuota == nil { - // Fast-path. - return - } - for ok := it.init(&b.cmdBuf); ok; ok = it.next() { - cmd := it.cur() - toRelease := int64(0) - if cmd.proposedLocally() { - // Delete the proposal from the proposals map. There may be reproposals - // of the proposal in the pipeline, but those will all have the same max - // lease index, meaning that they will all be rejected after this entry - // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex - // picks up the proposal on failure, it will re-add the proposal to the - // proposal map, but this won't affect anything in this cmdAppBatch. - // - // While here, add the proposal's quota size to the quota release queue. - // We check the proposal map again first to avoid double free-ing quota - // when reproposals from the same proposal end up in the same entry - // application batch. - delete(r.mu.proposals, cmd.idKey) - toRelease = cmd.proposal.quotaSize - } - // At this point we're not guaranteed to have proposalQuota initialized, - // the same is true for quotaReleaseQueues. Only queue the proposal's - // quota for release if the proposalQuota is initialized - if r.mu.proposalQuota != nil { - r.mu.quotaReleaseQueue = append(r.mu.quotaReleaseQueue, toRelease) - } - } -} - -// stageRaftCommand handles the first phase of applying a command to the -// replica state machine. -// -// The proposal also contains auxiliary data which needs to be verified in order -// to decide whether the proposal should be applied: the command's MaxLeaseIndex -// must move the state machine's LeaseAppliedIndex forward, and the proposer's -// lease (or rather its sequence number) must match that of the state machine, -// and lastly the GCThreshold is validated. If any of the checks fail, the -// proposal's content is wiped and we apply an empty log entry instead. If an -// error occurs and the command was proposed locally, the error will be -// communicated to the waiting proposer. The two typical cases in which errors -// occur are lease mismatch (in which case the caller tries to send the command -// to the actual leaseholder) and violation of the LeaseAppliedIndex (in which -// case the proposal is retried if it was proposed locally). -// -// Assuming all checks were passed, the command is applied to the batch, -// which is done by the aptly named applyRaftCommandToBatch. -// -// For trivial proposals this is the whole story, but some commands trigger -// additional code in this method in this method via a side effect (in the -// proposal's ReplicatedEvalResult or, for local proposals, -// LocalEvalResult). These might, for example, trigger an update of the -// Replica's in-memory state to match updates to the on-disk state, or pass -// intents to the intent resolver. Some commands don't fit this simple schema -// and need to hook deeper into the code. Notably splits and merges need to -// acquire locks on their right-hand side Replicas and may need to add data to -// the WriteBatch before it is applied; similarly, changes to the disk layout of -// internal state typically require a migration which shows up here. Any of this -// logic however is deferred until after the batch has been written to the -// storage engine. -func (r *Replica) stageRaftCommand( - ctx context.Context, - cmd *cmdAppCtx, - batch engine.Batch, - replicaState *storagepb.ReplicaState, - writeAppliedState bool, -) { - if cmd.e.Index == 0 { - log.Fatalf(ctx, "processRaftCommand requires a non-zero index") - } - if log.V(4) { - log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", - cmd.idKey, cmd.raftCmd.MaxLeaseIndex) - } - - var ts hlc.Timestamp - if cmd.idKey != "" { - ts = cmd.replicatedResult().Timestamp - } - - cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr(ctx, - cmd.idKey, cmd.raftCmd, cmd.proposal, cmd.proposedLocally(), replicaState) - if cmd.forcedErr == nil { - // Verify that the batch timestamp is after the GC threshold. This is - // necessary because not all commands declare read access on the GC - // threshold key, even though they implicitly depend on it. This means - // that access to this state will not be serialized by latching, - // so we must perform this check upstream and downstream of raft. - // See #14833. - // - // We provide an empty key span because we already know that the Raft - // command is allowed to apply within its key range. This is guaranteed - // by checks upstream of Raft, which perform the same validation, and by - // span latches, which assure that any modifications to the range's - // boundaries will be serialized with this command. Finally, the - // leaseAppliedIndex check in checkForcedErrLocked ensures that replays - // outside of the spanlatch manager's control which break this - // serialization ordering will already by caught and an error will be - // thrown. - cmd.forcedErr = roachpb.NewError(r.requestCanProceed(roachpb.RSpan{}, ts)) - } - if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { - var newPropRetry int - newPropRetry, cmd.forcedErr = filter(storagebase.ApplyFilterArgs{ - CmdID: cmd.idKey, - ReplicatedEvalResult: *cmd.replicatedResult(), - StoreID: r.store.StoreID(), - RangeID: r.RangeID, - }) - if cmd.proposalRetry == 0 { - cmd.proposalRetry = proposalReevaluationReason(newPropRetry) - } - } - if cmd.forcedErr != nil { - // Apply an empty entry. - *cmd.replicatedResult() = storagepb.ReplicatedEvalResult{} - cmd.raftCmd.WriteBatch = nil - cmd.raftCmd.LogicalOpLog = nil - log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr) - } else { - log.Event(ctx, "applying command") - } - - // Acquire the split or merge lock, if necessary. If a split or merge - // command was rejected with a below-Raft forced error then its replicated - // result was just cleared and this will be a no-op. - if splitMergeUnlock, err := r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { - log.Fatalf(ctx, "unable to acquire split lock: %s", err) - } else if splitMergeUnlock != nil { - // Set the splitMergeUnlock on the cmdAppCtx to be called after the batch - // has been applied (see applyBatch). - cmd.splitMergeUnlock = splitMergeUnlock - } - - // Update the node clock with the serviced request. This maintains - // a high water mark for all ops serviced, so that received ops without - // a timestamp specified are guaranteed one higher than any op already - // executed for overlapping keys. - // TODO(ajwerner): coalesce the clock update per batch. - r.store.Clock().Update(ts) - - // If the command was using the deprecated version of the MVCCStats proto, - // migrate it to the new version and clear out the field. - if deprecatedDelta := cmd.replicatedResult().DeprecatedDelta; deprecatedDelta != nil { - if cmd.replicatedResult().Delta != (enginepb.MVCCStatsDelta{}) { - log.Fatalf(ctx, "stats delta not empty but deprecated delta provided: %+v", cmd) - } - cmd.replicatedResult().Delta = deprecatedDelta.ToStatsDelta() - cmd.replicatedResult().DeprecatedDelta = nil - } - - // Apply the Raft command to the batch's accumulated state. This may also - // have the effect of mutating cmd.replicatedResult(). - // applyRaftCommandToBatch will return "expected" errors, but may also indicate - // replica corruption (as of now, signaled by a replicaCorruptionError). - // We feed its return through maybeSetCorrupt to act when that happens. - err := r.applyRaftCommandToBatch(cmd.ctx, cmd, replicaState, batch, writeAppliedState) - if err != nil { - // applyRaftCommandToBatch returned an error, which usually indicates - // either a serious logic bug in CockroachDB or a disk - // corruption/out-of-space issue. Make sure that these fail with - // descriptive message so that we can differentiate the root causes. - log.Errorf(ctx, "unable to update the state machine: %+v", err) - // Report the fatal error separately and only with the error, as that - // triggers an optimization for which we directly report the error to - // sentry (which in turn allows sentry to distinguish different error - // types). - log.Fatal(ctx, err) - } - - // AddSSTable ingestions run before the actual batch gets written to the - // storage engine. This makes sure that when the Raft command is applied, - // the ingestion has definitely succeeded. Note that we have taken - // precautions during command evaluation to avoid having mutations in the - // WriteBatch that affect the SSTable. Not doing so could result in order - // reversal (and missing values) here. - // - // NB: any command which has an AddSSTable is non-trivial and will be - // applied in its own batch so it's not possible that any other commands - // which precede this command can shadow writes from this SSTable. - if cmd.replicatedResult().AddSSTable != nil { - copied := addSSTablePreApply( - ctx, - r.store.cfg.Settings, - r.store.engine, - r.raftMu.sideloaded, - cmd.e.Term, - cmd.e.Index, - *cmd.replicatedResult().AddSSTable, - r.store.limiters.BulkIOWriteRate, - ) - r.store.metrics.AddSSTableApplications.Inc(1) - if copied { - r.store.metrics.AddSSTableApplicationCopies.Inc(1) - } - cmd.replicatedResult().AddSSTable = nil - } - - if cmd.replicatedResult().Split != nil { - // Splits require a new HardState to be written to the new RHS - // range (and this needs to be atomic with the main batch). This - // cannot be constructed at evaluation time because it differs - // on each replica (votes may have already been cast on the - // uninitialized replica). Write this new hardstate to the batch too. - // See https://github.com/cockroachdb/cockroach/issues/20629 - splitPreApply(ctx, batch, cmd.replicatedResult().Split.SplitTrigger) - } - - if merge := cmd.replicatedResult().Merge; merge != nil { - // Merges require the subsumed range to be atomically deleted when the - // merge transaction commits. - rhsRepl, err := r.store.GetReplica(merge.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } - const destroyData = false - err = rhsRepl.preDestroyRaftMuLocked(ctx, batch, batch, merge.RightDesc.NextReplicaID, destroyData) - if err != nil { - log.Fatal(ctx, err) - } - } - - // Provide the command's corresponding logical operations to the Replica's - // rangefeed. Only do so if the WriteBatch is non-nil, in which case the - // rangefeed requires there to be a corresponding logical operation log or - // it will shut down with an error. If the WriteBatch is nil then we expect - // the logical operation log to also be nil. We don't want to trigger a - // shutdown of the rangefeed in that situation, so we don't pass anything to - // the rangefed. If no rangefeed is running at all, this call will be a noop. - if cmd.raftCmd.WriteBatch != nil { - r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog, batch) - } else if cmd.raftCmd.LogicalOpLog != nil { - log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) - } -} - -func checkForcedErr( - ctx context.Context, - idKey storagebase.CmdIDKey, - raftCmd storagepb.RaftCommand, - proposal *ProposalData, - proposedLocally bool, - replicaState *storagepb.ReplicaState, -) (uint64, proposalReevaluationReason, *roachpb.Error) { - leaseIndex := replicaState.LeaseAppliedIndex - isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest - var requestedLease roachpb.Lease - if isLeaseRequest { - requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease - } - if idKey == "" { - // This is an empty Raft command (which is sent by Raft after elections - // to trigger reproposals or during concurrent configuration changes). - // Nothing to do here except making sure that the corresponding batch - // (which is bogus) doesn't get executed (for it is empty and so - // properties like key range are undefined). - return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") - } - - // Verify the lease matches the proposer's expectation. We rely on - // the proposer's determination of whether the existing lease is - // held, and can be used, or is expired, and can be replaced. - // Verify checks that the lease has not been modified since proposal - // due to Raft delays / reorderings. - // To understand why this lease verification is necessary, see comments on the - // proposer_lease field in the proto. - leaseMismatch := false - if raftCmd.DeprecatedProposerLease != nil { - // VersionLeaseSequence must not have been active when this was proposed. - // - // This does not prevent the lease race condition described below. The - // reason we don't fix this here as well is because fixing the race - // requires a new cluster version which implies that we'll already be - // using lease sequence numbers and will fall into the case below. - leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*replicaState.Lease) - } else { - leaseMismatch = raftCmd.ProposerLeaseSequence != replicaState.Lease.Sequence - if !leaseMismatch && isLeaseRequest { - // Lease sequence numbers are a reflection of lease equivalency - // between subsequent leases. However, Lease.Equivalent is not fully - // symmetric, meaning that two leases may be Equivalent to a third - // lease but not Equivalent to each other. If these leases are - // proposed under that same third lease, neither will be able to - // detect whether the other has applied just by looking at the - // current lease sequence number because neither will will increment - // the sequence number. - // - // This can lead to inversions in lease expiration timestamps if - // we're not careful. To avoid this, if a lease request's proposer - // lease sequence matches the current lease sequence and the current - // lease sequence also matches the requested lease sequence, we make - // sure the requested lease is Equivalent to current lease. - if replicaState.Lease.Sequence == requestedLease.Sequence { - // It is only possible for this to fail when expiration-based - // lease extensions are proposed concurrently. - leaseMismatch = !replicaState.Lease.Equivalent(requestedLease) - } - - // This is a check to see if the lease we proposed this lease request against is the same - // lease that we're trying to update. We need to check proposal timestamps because - // extensions don't increment sequence numbers. Without this check a lease could - // be extended and then another lease proposed against the original lease would - // be applied over the extension. - if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && - (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *replicaState.Lease.ProposedTS) { - leaseMismatch = true - } - } - } - if leaseMismatch { - log.VEventf( - ctx, 1, - "command proposed from replica %+v with lease #%d incompatible to %v", - raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *replicaState.Lease, - ) - if isLeaseRequest { - // For lease requests we return a special error that - // redirectOnOrAcquireLease() understands. Note that these - // requests don't go through the DistSender. - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *replicaState.Lease, - Requested: requestedLease, - Message: "proposed under invalid lease", - }) - } - // We return a NotLeaseHolderError so that the DistSender retries. - nlhe := newNotLeaseHolderError( - replicaState.Lease, raftCmd.ProposerReplica.StoreID, replicaState.Desc) - nlhe.CustomMsg = fmt.Sprintf( - "stale proposal: command was proposed under lease #%d but is being applied "+ - "under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease) - return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) - } - - if isLeaseRequest { - // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This - // makes sense since lease commands are proposed by anyone, so we can't expect a coherent - // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the - // counter makes sense from a testing perspective. - // - // However, leases get special vetting to make sure we don't give one to a replica that was - // since removed (see #15385 and a comment in redirectOnOrAcquireLease). - if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { - return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ - Existing: *replicaState.Lease, - Requested: requestedLease, - Message: "replica not part of range", - }) - } - } else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { - // The happy case: the command is applying at or ahead of the minimal - // permissible index. It's ok if it skips a few slots (as can happen - // during rearrangement); this command will apply, but later ones which - // were proposed at lower indexes may not. Overall though, this is more - // stable and simpler than requiring commands to apply at their exact - // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 - // is otherwise tricky since we can't tell the client to try again - // (reproposals could exist and may apply at the right index, leading - // to a replay), and assigning the required index would be tedious - // seeing that it would have to rewind sometimes. - leaseIndex = raftCmd.MaxLeaseIndex - } else { - // The command is trying to apply at a past log position. That's - // unfortunate and hopefully rare; the client on the proposer will try - // again. Note that in this situation, the leaseIndex does not advance. - retry := proposalNoReevaluation - if proposedLocally { - log.VEventf( - ctx, 1, - "retry proposal %x: applied at lease index %d, required < %d", - proposal.idKey, leaseIndex, raftCmd.MaxLeaseIndex, - ) - retry = proposalIllegalLeaseIndex - } - return leaseIndex, retry, roachpb.NewErrorf( - "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, - ) - } - return leaseIndex, proposalNoReevaluation, nil -} - -// applyRaftCommandToBatch applies a raft command from the replicated log to the -// current batch's view of the underlying state machine. When the state machine -// cannot be updated, an error (which is fatal!) is returned and must be treated -// that way by the caller. -func (r *Replica) applyRaftCommandToBatch( - ctx context.Context, - cmd *cmdAppCtx, - replicaState *storagepb.ReplicaState, - batch engine.Batch, - writeAppliedState bool, -) error { - writeBatch := cmd.raftCmd.WriteBatch - if writeBatch != nil && len(writeBatch.Data) > 0 { - mutationCount, err := engine.RocksDBBatchCount(writeBatch.Data) - if err != nil { - log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err) - } else { - cmd.mutationCount = mutationCount - } - } - - // Exploit the fact that a split will result in a full stats - // recomputation to reset the ContainsEstimates flag. - // - // TODO(tschottdorf): We want to let the usual MVCCStats-delta - // machinery update our stats for the left-hand side. But there is no - // way to pass up an MVCCStats object that will clear out the - // ContainsEstimates flag. We should introduce one, but the migration - // makes this worth a separate effort (ContainsEstimates would need to - // have three possible values, 'UNCHANGED', 'NO', and 'YES'). - // Until then, we're left with this rather crude hack. - if cmd.replicatedResult().Split != nil { - replicaState.Stats.ContainsEstimates = false - } - - if cmd.e.Index != replicaState.RaftAppliedIndex+1 { - // If we have an out of order index, there's corruption. No sense in - // trying to update anything or running the command. Simply return - // a corruption error. - return errors.Errorf("applied index jumped from %d to %d", - replicaState.RaftAppliedIndex, cmd.e.Index) - } - - if writeBatch != nil { - if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil { - return errors.Wrap(err, "unable to apply WriteBatch") - } - } - - // The only remaining use of the batch is for range-local keys which we know - // have not been previously written within this batch. - // - // TODO(ajwerner): explore the costs and benefits of this use of Distinct(). - // This call requires flushing the batch's writes but should make subsequent - // writes somewhat cheaper. Early exploration showed no win but found that if - // a Distinct() batch could be used for all of command application it could - // be a win. At the time of writing that approach was deemed not safe. - writer := batch.Distinct() - - // Special-cased MVCC stats handling to exploit commutativity of stats delta - // upgrades. Thanks to commutativity, the spanlatch manager does not have to - // serialize on the stats key. - deltaStats := cmd.replicatedResult().Delta.ToStats() - usingAppliedStateKey := replicaState.UsingAppliedStateKey - needAppliedStateMigration := !usingAppliedStateKey && - cmd.replicatedResult().State != nil && - cmd.replicatedResult().State.UsingAppliedStateKey - if needAppliedStateMigration { - // The Raft command wants us to begin using the RangeAppliedState key - // and we haven't performed the migration yet. Delete the old keys - // that this new key is replacing. - // - // NB: entering this branch indicates that the cmd was considered - // non-trivial and therefore placed in its own batch. - err := r.raftMu.stateLoader.MigrateToRangeAppliedStateKey(ctx, writer, &deltaStats) - if err != nil { - return errors.Wrap(err, "unable to migrate to range applied state") - } - usingAppliedStateKey = true - } - - if !writeAppliedState { - // Don't write any applied state, regardless of the technique we'd be using. - } else if usingAppliedStateKey { - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms := *replicaState.Stats - ms.Add(deltaStats) - - // Set the range applied state, which includes the last applied raft and - // lease index along with the mvcc stats, all in one key. - if err := r.raftMu.stateLoader.SetRangeAppliedState(ctx, writer, - cmd.e.Index, cmd.leaseIndex, &ms); err != nil { - return errors.Wrap(err, "unable to set range applied state") - } - } else { - // Advance the last applied index. We use a blind write in order to avoid - // reading the previous applied index keys on every write operation. This - // requires a little additional work in order maintain the MVCC stats. - var appliedIndexNewMS enginepb.MVCCStats - if err := r.raftMu.stateLoader.SetLegacyAppliedIndexBlind(ctx, writer, &appliedIndexNewMS, - cmd.e.Index, cmd.leaseIndex); err != nil { - return errors.Wrap(err, "unable to set applied index") - } - deltaStats.SysBytes += appliedIndexNewMS.SysBytes - - r.raftMu.stateLoader.CalcAppliedIndexSysBytes(replicaState.RaftAppliedIndex, replicaState.LeaseAppliedIndex) - - // Note that calling ms.Add will never result in ms.LastUpdateNanos - // decreasing (and thus LastUpdateNanos tracks the maximum LastUpdateNanos - // across all deltaStats). - ms := *replicaState.Stats - ms.Add(deltaStats) - if err := r.raftMu.stateLoader.SetMVCCStats(ctx, writer, &ms); err != nil { - return errors.Wrap(err, "unable to update MVCCStats") - } - } - // We may have modified the effect on the range's stats that the application - // of the command will have. Update the command's stats delta to reflect this. - cmd.replicatedResult().Delta = deltaStats.ToStatsDelta() - - // Close the Distinct() batch here now that we're done writing to it. - writer.Close() - - // NB: it is not sane to use the distinct engine when updating truncated state - // as multiple commands in the same batch could modify that state and the - // below code reads before it writes. - haveTruncatedState := cmd.replicatedResult().State != nil && - cmd.replicatedResult().State.TruncatedState != nil - if haveTruncatedState { - apply, err := handleTruncatedStateBelowRaft(ctx, replicaState.TruncatedState, - cmd.replicatedResult().State.TruncatedState, r.raftMu.stateLoader, batch) - if err != nil { - return err - } - if !apply { - // The truncated state was discarded, so make sure we don't apply - // it to our in-memory state. - cmd.replicatedResult().State.TruncatedState = nil - cmd.replicatedResult().RaftLogDelta = 0 - // TODO(ajwerner): consider moving this code. - // We received a truncation that doesn't apply to us, so we know that - // there's a leaseholder out there with a log that has earlier entries - // than ours. That leader also guided our log size computations by - // giving us RaftLogDeltas for past truncations, and this was likely - // off. Mark our Raft log size is not trustworthy so that, assuming - // we step up as leader at some point in the future, we recompute - // our numbers. - r.mu.Lock() - r.mu.raftLogSizeTrusted = false - r.mu.Unlock() - } - } - - start := timeutil.Now() - - // TODO(ajwerner): This assertion no longer makes much sense. - var assertHS *raftpb.HardState - if util.RaceEnabled && cmd.replicatedResult().Split != nil { - rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID) - oldHS, err := rsl.LoadHardState(ctx, batch) - if err != nil { - return errors.Wrap(err, "unable to load HardState") - } - assertHS = &oldHS - } - - if assertHS != nil { - // Load the HardState that was just committed (if any). - rsl := stateloader.Make(cmd.replicatedResult().Split.RightDesc.RangeID) - newHS, err := rsl.LoadHardState(ctx, batch) - if err != nil { - return errors.Wrap(err, "unable to load HardState") - } - // Assert that nothing moved "backwards". - if newHS.Term < assertHS.Term || - (newHS.Term == assertHS.Term && newHS.Commit < assertHS.Commit) { - log.Fatalf(ctx, "clobbered HardState: %s\n\npreviously: %s\noverwritten with: %s", - pretty.Diff(newHS, *assertHS), pretty.Sprint(*assertHS), pretty.Sprint(newHS)) - } - } - - // TODO(ajwerner): This metric no longer has anything to do with the "Commit" - // of anything. Unfortunately it's exposed in the admin ui of 19.1. We should - // remove it from the admin ui for 19.2 and then in 20.1 we can rename it to - // something more appropriate. - elapsed := timeutil.Since(start) - r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) - return nil -} - -// applyCmdAppBatch handles the logic of writing a batch to the storage engine and -// applying it to the Replica state machine. This method clears b for reuse -// before returning. -func (r *Replica) applyCmdAppBatch( - ctx context.Context, b *cmdAppBatch, stats *handleCommittedEntriesStats, -) (errExpl string, err error) { - defer b.reset() - if log.V(4) { - log.Infof(ctx, "flushing batch %v of %d entries", b.replicaState, b.cmdBuf.len) - } - // Entry application is not done without syncing to disk. - // The atomicity guarantees of the batch and the fact that the applied state - // is stored in this batch, ensure that if the batch ends up not being durably - // committed then the entries in this batch will be applied again upon - // startup. - if err := b.batch.Commit(false); err != nil { - log.Fatalf(ctx, "failed to commit Raft entry batch: %v", err) - } - b.batch.Close() - b.batch = nil - // NB: we compute the triviality of the batch here again rather than storing - // it as it may have changed during processing. In particular, the upgrade of - // applied state will have already occurred. - var batchIsNonTrivial bool - // Non-trivial entries are always alone in a batch and thus are last. - if cmd := b.cmdBuf.last(); !isTrivial(cmd.replicatedResult(), b.replicaState.UsingAppliedStateKey) { - batchIsNonTrivial = true - // Deal with locking sometimes associated with complex commands. - if unlock := cmd.splitMergeUnlock; unlock != nil { - defer unlock() - cmd.splitMergeUnlock = nil - } - if cmd.replicatedResult().BlockReads { - r.readOnlyCmdMu.Lock() - defer r.readOnlyCmdMu.Unlock() - cmd.replicatedResult().BlockReads = false - } - } - // Now that the batch is committed we can go about applying the side effects - // of the update to the truncated state. Note that this is safe only if the - // new truncated state is durably on disk (i.e.) synced. See #38566. - // - // NB: even if multiple updates to TruncatedState occurred in this batch we - // coalesce their side effects and only consult the latest - // TruncatedState.Index. - var sideloadTruncationDelta int64 - if b.updatedTruncatedState { - truncState := b.replicaState.TruncatedState - r.store.raftEntryCache.Clear(r.RangeID, truncState.Index+1) - log.VEventf(ctx, 1, "truncating sideloaded storage up to (and including) index %d", truncState.Index) - if sideloadTruncationDelta, _, err = r.raftMu.sideloaded.TruncateTo(ctx, truncState.Index+1); err != nil { - // We don't *have* to remove these entries for correctness. Log a - // loud error, but keep humming along. - log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) - } - } - - // Iterate through the cmds to compute the raft log delta and writeStats. - var mutationCount int - var it cmdAppCtxBufIterator - raftLogDelta := -sideloadTruncationDelta - for ok := it.init(&b.cmdBuf); ok; ok = it.next() { - cmd := it.cur() - raftLogDelta += cmd.replicatedResult().RaftLogDelta - cmd.replicatedResult().RaftLogDelta = 0 - mutationCount += cmd.mutationCount - } - // Record the write activity, passing a 0 nodeID because replica.writeStats - // intentionally doesn't track the origin of the writes. - r.writeStats.recordCount(float64(mutationCount), 0 /* nodeID */) - // deltaStats will store the delta from the current state to the new state - // which will be used to update the metrics. - r.mu.Lock() - r.mu.state.RaftAppliedIndex = b.replicaState.RaftAppliedIndex - r.mu.state.LeaseAppliedIndex = b.replicaState.LeaseAppliedIndex - prevStats := *r.mu.state.Stats - *r.mu.state.Stats = *b.replicaState.Stats - if raftLogDelta != 0 { - r.mu.raftLogSize += raftLogDelta - if r.mu.raftLogSize < 0 { - r.mu.raftLogSize = 0 - } - r.mu.raftLogLastCheckSize += raftLogDelta - if r.mu.raftLogLastCheckSize < 0 { - r.mu.raftLogLastCheckSize = 0 - } - } - if b.updatedTruncatedState { - r.mu.state.TruncatedState = b.replicaState.TruncatedState - } - // Check the queuing conditions. - checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize - needsSplitBySize := r.needsSplitBySizeRLocked() - needsMergeBySize := r.needsMergeBySizeRLocked() - r.mu.Unlock() - deltaStats := *b.replicaState.Stats - deltaStats.Subtract(prevStats) - r.store.metrics.addMVCCStats(deltaStats) - // NB: the bootstrap store has a nil split queue. - // TODO(tbg): the above is probably a lie now. - if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(timeutil.Now()) { - r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - // The bootstrap store has a nil merge queue. - // TODO(tbg): the above is probably a lie now. - if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(timeutil.Now()) { - // TODO(tbg): for ranges which are small but protected from merges by - // other means (zone configs etc), this is called on every command, and - // fires off a goroutine each time. Make this trigger (and potentially - // the split one above, though it hasn't been observed to be as - // bothersome) less aggressive. - r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - if checkRaftLog { - r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) - } - for ok := it.init(&b.cmdBuf); ok; ok = it.next() { - cmd := it.cur() - // Reset the context for already applied commands to ensure that - // reproposals at the same MaxLeaseIndex do not record into closed spans. - if cmd.proposedLocally() && cmd.proposal.applied { - cmd.ctx = ctx - } - for _, sc := range cmd.replicatedResult().SuggestedCompactions { - r.store.compactor.Suggest(cmd.ctx, sc) - } - cmd.replicatedResult().SuggestedCompactions = nil - isNonTrivial := batchIsNonTrivial && it.isLast() - if errExpl, err = r.handleRaftCommandResult(cmd.ctx, cmd, isNonTrivial, - b.replicaState.UsingAppliedStateKey); err != nil { - return errExpl, err - } - if isNonTrivial { - stats.stateAssertions++ - } - stats.entriesProcessed++ - } - stats.batchesProcessed++ - return "", nil -} diff --git a/pkg/storage/replica_application_impl.go b/pkg/storage/replica_application_impl.go new file mode 100644 index 000000000000..4729682ddd88 --- /dev/null +++ b/pkg/storage/replica_application_impl.go @@ -0,0 +1,1040 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/storagepb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/kr/pretty" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/raftpb" +) + +// replica_application_impl.go provides concrete implementations of +// apply.Decoder (replicaDecoder) and apply.StateMachine (replicaApplier), +// allowing Replica to interface with the storage/apply package. +// +// TODO(nvanbenschoten): rename this file back to replica_application.go +// once it has been reviewed. The diff would have been too annoying. + +// replicaDecoder implements the apply.Decoder interface. +// +// The object is capable of decoding committed raft entries into a list of +// cmdAppCtx objects (which implement all variants of apply.Command), binding +// these commands to their local proposals, and providing an iterator over these +// commands. +type replicaDecoder struct { + r *Replica + cmdBuf cmdAppCtxBuf +} + +// getDecoder returns the Replica's apply.Decoder. +func (r *Replica) getDecoder() *replicaDecoder { + d := &r.raftMu.decoder + d.r = r + return d +} + +// DecodeAndBind implements the apply.Decoder interface. +func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry) (bool, error) { + if err := d.decode(ctx, ents); err != nil { + return false, err + } + return d.retrieveLocalProposals(ctx) +} + +// decode decodes the provided entries into the decoder. +func (d *replicaDecoder) decode(ctx context.Context, ents []raftpb.Entry) error { + for i := range ents { + ent := &ents[i] + if _, err := d.cmdBuf.allocate().decode(ctx, ent); err != nil { + return err + } + } + return nil +} + +// retrieveLocalProposals binds each of the decoder's commands to their local +// proposals if they were proposed locally. The method also sets the ctx fields +// on all commands. +func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal bool, _ error) { + d.r.mu.Lock() + defer d.r.mu.Unlock() + // Assign all the local proposals first then delete all of them from the map + // in a second pass. This ensures that we retrieve all proposals correctly + // even if the applier has multiple entries for the same proposal, in which + // case the proposal was reproposed (either under its original or a new + // MaxLeaseIndex) which we handle in a second pass below. + var it cmdAppCtxBufSlice + for it.init(&d.cmdBuf); it.Valid(); it.Next() { + cmd := it.cur() + cmd.proposal = d.r.mu.proposals[cmd.idKey] + if cmd.IsLocal() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + // If this entry does not have the most up-to-date view of the + // corresponding proposal's maximum lease index then the proposal + // must have been reproposed with a higher lease index. (see + // tryReproposeWithNewLeaseIndex). In that case, there's a newer + // version of the proposal in the pipeline, so don't consider this + // entry to have been proposed locally. The entry must necessarily be + // rejected by checkForcedErr. + cmd.proposal = nil + } + if cmd.IsLocal() { + // We initiated this command, so use the caller-supplied context. + cmd.ctx = cmd.proposal.ctx + anyLocal = true + } else { + cmd.ctx = ctx + } + } + if !anyLocal && d.r.mu.proposalQuota == nil { + // Fast-path. + return false, nil + } + for it.init(&d.cmdBuf); it.Valid(); it.Next() { + cmd := it.cur() + toRelease := int64(0) + if cmd.IsLocal() { + // Delete the proposal from the proposals map. There may be reproposals + // of the proposal in the pipeline, but those will all have the same max + // lease index, meaning that they will all be rejected after this entry + // applies (successfully or otherwise). If tryReproposeWithNewLeaseIndex + // picks up the proposal on failure, it will re-add the proposal to the + // proposal map, but this won't affect this replicaApplier. + // + // While here, add the proposal's quota size to the quota release queue. + // We check the proposal map again first to avoid double free-ing quota + // when reproposals from the same proposal end up in the same entry + // application batch. + delete(d.r.mu.proposals, cmd.idKey) + toRelease = cmd.proposal.quotaSize + } + // At this point we're not guaranteed to have proposalQuota initialized, + // the same is true for quotaReleaseQueues. Only queue the proposal's + // quota for release if the proposalQuota is initialized. + if d.r.mu.proposalQuota != nil { + d.r.mu.quotaReleaseQueue = append(d.r.mu.quotaReleaseQueue, toRelease) + } + } + return anyLocal, nil +} + +// NewCommandIter implements the apply.Decoder interface. +func (d *replicaDecoder) NewCommandIter() apply.CommandIterator { + it := d.cmdBuf.newIter() + it.init(&d.cmdBuf) + return it +} + +// Reset implements the apply.Decoder interface. +func (d *replicaDecoder) Reset() { + d.cmdBuf.clear() +} + +// applyCommittedEntriesStats returns stats about what happened during the +// application of a set of raft entries. +// +// TODO(ajwerner): add metrics to go with these stats. +type applyCommittedEntriesStats struct { + batchesProcessed int + entriesProcessed int + stateAssertions int + numEmptyEntries int +} + +// replicaApplier implements the apply.StateMachine interface. +// +// The structure coordinates state transitions within the Replica state machine +// due to the application of replicated commands decoded from committed raft +// entries. Commands are applied to the state machine in a multi-stage process +// whereby individual commands are prepared for application relative to the +// current view of ReplicaState and staged in a replicaAppBatch, the batch is +// committed to the Replica's storage engine atomically, and finally the +// side-effects of each command is applied to the Replica's in-memory state. +type replicaApplier struct { + r *Replica + // batch is returned from NewBatch(). + batch replicaAppBatch + // stats are updated during command application and reset by moveStats. + stats applyCommittedEntriesStats +} + +// getApplier returns the Replica's apply.StateMachine. +func (r *Replica) getApplier() *replicaApplier { + a := &r.raftMu.applier + a.r = r + return a +} + +// checkShouldApplyCommand determines whether or not a command should be applied +// to the replicated state machine after it has been committed to the Raft log. +// This decision is deterministic on all replicas, such that a command that is +// rejected "beneath raft" on one replica will be rejected "beneath raft" on all +// replicas. +// +// The decision about whether or not to apply a command is a combination of +// three checks: +// 1. verify that the command was proposed under the current lease. This is +// determined using the proposal's ProposerLeaseSequence. +// 2. verify that the command hasn't been re-ordered with other commands that +// were proposed after it and which already applied. This is determined +// using the proposal's MaxLeaseIndex. +// 3. verify that the command isn't in violation of the Range's current +// garbage collection threshold. This is determined using the proposal's +// Timestamp. +// +// The method sets the command's leaseIndex, proposalRetry, and forcedErr +// fields. It returns whether command should be applied or rejected. +func (r *Replica) checkShouldApplyCommand( + ctx context.Context, cmd *cmdAppCtx, replicaState *storagepb.ReplicaState, +) bool { + cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr( + ctx, cmd.idKey, &cmd.raftCmd, cmd.IsLocal(), replicaState, + ) + if filter := r.store.cfg.TestingKnobs.TestingApplyFilter; cmd.forcedErr == nil && filter != nil { + var newPropRetry int + newPropRetry, cmd.forcedErr = filter(storagebase.ApplyFilterArgs{ + CmdID: cmd.idKey, + ReplicatedEvalResult: *cmd.replicatedResult(), + StoreID: r.store.StoreID(), + RangeID: r.RangeID, + }) + if cmd.proposalRetry == 0 { + cmd.proposalRetry = proposalReevaluationReason(newPropRetry) + } + } + return cmd.forcedErr == nil +} + +// TODO(nvanbenschoten): Unit test this function now that it is stateless. +func checkForcedErr( + ctx context.Context, + idKey storagebase.CmdIDKey, + raftCmd *storagepb.RaftCommand, + isLocal bool, + replicaState *storagepb.ReplicaState, +) (uint64, proposalReevaluationReason, *roachpb.Error) { + leaseIndex := replicaState.LeaseAppliedIndex + isLeaseRequest := raftCmd.ReplicatedEvalResult.IsLeaseRequest + var requestedLease roachpb.Lease + if isLeaseRequest { + requestedLease = *raftCmd.ReplicatedEvalResult.State.Lease + } + if idKey == "" { + // This is an empty Raft command (which is sent by Raft after elections + // to trigger reproposals or during concurrent configuration changes). + // Nothing to do here except making sure that the corresponding batch + // (which is bogus) doesn't get executed (for it is empty and so + // properties like key range are undefined). + return leaseIndex, proposalNoReevaluation, roachpb.NewErrorf("no-op on empty Raft entry") + } + + // Verify the lease matches the proposer's expectation. We rely on + // the proposer's determination of whether the existing lease is + // held, and can be used, or is expired, and can be replaced. + // Verify checks that the lease has not been modified since proposal + // due to Raft delays / reorderings. + // To understand why this lease verification is necessary, see comments on the + // proposer_lease field in the proto. + leaseMismatch := false + if raftCmd.DeprecatedProposerLease != nil { + // VersionLeaseSequence must not have been active when this was proposed. + // + // This does not prevent the lease race condition described below. The + // reason we don't fix this here as well is because fixing the race + // requires a new cluster version which implies that we'll already be + // using lease sequence numbers and will fall into the case below. + leaseMismatch = !raftCmd.DeprecatedProposerLease.Equivalent(*replicaState.Lease) + } else { + leaseMismatch = raftCmd.ProposerLeaseSequence != replicaState.Lease.Sequence + if !leaseMismatch && isLeaseRequest { + // Lease sequence numbers are a reflection of lease equivalency + // between subsequent leases. However, Lease.Equivalent is not fully + // symmetric, meaning that two leases may be Equivalent to a third + // lease but not Equivalent to each other. If these leases are + // proposed under that same third lease, neither will be able to + // detect whether the other has applied just by looking at the + // current lease sequence number because neither will will increment + // the sequence number. + // + // This can lead to inversions in lease expiration timestamps if + // we're not careful. To avoid this, if a lease request's proposer + // lease sequence matches the current lease sequence and the current + // lease sequence also matches the requested lease sequence, we make + // sure the requested lease is Equivalent to current lease. + if replicaState.Lease.Sequence == requestedLease.Sequence { + // It is only possible for this to fail when expiration-based + // lease extensions are proposed concurrently. + leaseMismatch = !replicaState.Lease.Equivalent(requestedLease) + } + + // This is a check to see if the lease we proposed this lease request against is the same + // lease that we're trying to update. We need to check proposal timestamps because + // extensions don't increment sequence numbers. Without this check a lease could + // be extended and then another lease proposed against the original lease would + // be applied over the extension. + if raftCmd.ReplicatedEvalResult.PrevLeaseProposal != nil && + (*raftCmd.ReplicatedEvalResult.PrevLeaseProposal != *replicaState.Lease.ProposedTS) { + leaseMismatch = true + } + } + } + if leaseMismatch { + log.VEventf( + ctx, 1, + "command proposed from replica %+v with lease #%d incompatible to %v", + raftCmd.ProposerReplica, raftCmd.ProposerLeaseSequence, *replicaState.Lease, + ) + if isLeaseRequest { + // For lease requests we return a special error that + // redirectOnOrAcquireLease() understands. Note that these + // requests don't go through the DistSender. + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "proposed under invalid lease", + }) + } + // We return a NotLeaseHolderError so that the DistSender retries. + nlhe := newNotLeaseHolderError( + replicaState.Lease, raftCmd.ProposerReplica.StoreID, replicaState.Desc) + nlhe.CustomMsg = fmt.Sprintf( + "stale proposal: command was proposed under lease #%d but is being applied "+ + "under lease: %s", raftCmd.ProposerLeaseSequence, replicaState.Lease) + return leaseIndex, proposalNoReevaluation, roachpb.NewError(nlhe) + } + + if isLeaseRequest { + // Lease commands are ignored by the counter (and their MaxLeaseIndex is ignored). This + // makes sense since lease commands are proposed by anyone, so we can't expect a coherent + // MaxLeaseIndex. Also, lease proposals are often replayed, so not making them update the + // counter makes sense from a testing perspective. + // + // However, leases get special vetting to make sure we don't give one to a replica that was + // since removed (see #15385 and a comment in redirectOnOrAcquireLease). + if _, ok := replicaState.Desc.GetReplicaDescriptor(requestedLease.Replica.StoreID); !ok { + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.LeaseRejectedError{ + Existing: *replicaState.Lease, + Requested: requestedLease, + Message: "replica not part of range", + }) + } + } else if replicaState.LeaseAppliedIndex < raftCmd.MaxLeaseIndex { + // The happy case: the command is applying at or ahead of the minimal + // permissible index. It's ok if it skips a few slots (as can happen + // during rearrangement); this command will apply, but later ones which + // were proposed at lower indexes may not. Overall though, this is more + // stable and simpler than requiring commands to apply at their exact + // lease index: Handling the case in which MaxLeaseIndex > oldIndex+1 + // is otherwise tricky since we can't tell the client to try again + // (reproposals could exist and may apply at the right index, leading + // to a replay), and assigning the required index would be tedious + // seeing that it would have to rewind sometimes. + leaseIndex = raftCmd.MaxLeaseIndex + } else { + // The command is trying to apply at a past log position. That's + // unfortunate and hopefully rare; the client on the proposer will try + // again. Note that in this situation, the leaseIndex does not advance. + retry := proposalNoReevaluation + if isLocal { + log.VEventf( + ctx, 1, + "retry proposal %x: applied at lease index %d, required < %d", + idKey, leaseIndex, raftCmd.MaxLeaseIndex, + ) + retry = proposalIllegalLeaseIndex + } + return leaseIndex, retry, roachpb.NewErrorf( + "command observed at lease index %d, but required < %d", leaseIndex, raftCmd.MaxLeaseIndex, + ) + } + + // Verify that the batch timestamp is after the GC threshold. This is + // necessary because not all commands declare read access on the GC + // threshold key, even though they implicitly depend on it. This means + // that access to this state will not be serialized by latching, + // so we must perform this check upstream and downstream of raft. + // See #14833. + ts := raftCmd.ReplicatedEvalResult.Timestamp + if !replicaState.GCThreshold.Less(ts) { + return leaseIndex, proposalNoReevaluation, roachpb.NewError(&roachpb.BatchTimestampBeforeGCError{ + Timestamp: ts, + Threshold: *replicaState.GCThreshold, + }) + } + return leaseIndex, proposalNoReevaluation, nil +} + +// NewBatch implements the apply.StateMachine interface. +func (a *replicaApplier) NewBatch() apply.Batch { + r := a.r + b := &a.batch + b.r = r + b.a = a + b.batch = r.store.engine.NewBatch() + r.mu.RLock() + b.state = r.mu.state + b.state.Stats = &b.stats + *b.state.Stats = *r.mu.state.Stats + r.mu.RUnlock() + b.start = timeutil.Now() + return b +} + +// replicaAppBatch implements the apply.Batch interface. +// +// The structure accumulates state due to the application of raft commands. +// Committed raft commands are applied to the state machine in a multi-stage +// process whereby individual commands are prepared for application relative +// to the current view of ReplicaState and staged in the batch. The batch is +// committed to the state machine's storage engine atomically. +type replicaAppBatch struct { + r *Replica + a *replicaApplier + + // batch accumulates writes implied by the raft entries in this batch. + batch engine.Batch + // state is this batch's view of the replica's state. It is copied from + // under the Replica.mu when the batch is initialized and is updated in + // stageTrivialReplicatedEvalResult. + state storagepb.ReplicaState + // stats is stored on the application batch to avoid an allocation in + // tracking the batch's view of replicaState. All pointer fields in + // replicaState other than Stats are overwritten completely rather than + // updated in-place. + stats enginepb.MVCCStats + // maxTS is the maximum timestamp that any command that was staged in this + // batch was evaluated at. + maxTS hlc.Timestamp + // migrateToAppliedStateKey tracks whether any command in the batch + // triggered a migration to the replica applied state key. If so, this + // migration will be performed when the application batch is committed. + migrateToAppliedStateKey bool + + // Statistics. + entries int + emptyEntries int + mutations int + start time.Time +} + +// Stage implements the apply.Batch interface. The method handles the first +// phase of applying a command to the replica state machine. +// +// The first thing the method does is determine whether the command should be +// applied at all or whether it should be rejected and replaced with an empty +// entry. The determination is based on the following rules: the command's +// MaxLeaseIndex must move the state machine's LeaseAppliedIndex forward, the +// proposer's lease (or rather its sequence number) must match that of the state +// machine, and lastly the GCThreshold must be below the timestamp that the +// command evaluated at. If any of the checks fail, the proposal's content is +// wiped and we apply an empty log entry instead. If a rejected command was +// proposed locally, the error will eventually be communicated to the waiting +// proposer. The two typical cases in which errors occur are lease mismatch (in +// which case the caller tries to send the command to the actual leaseholder) +// and violation of the LeaseAppliedIndex (in which case the proposal is retried +// if it was proposed locally). +// +// Assuming all checks were passed, the command's write batch is applied to the +// application batch. It's trivial ReplicatedState updates are then staged in +// the batch. This allows the batch to make an accurate determination about +// whether to accept or reject the next command that is staged without needing +// to actually update the replica state machine in between. +func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { + cmd := cmdI.(*cmdAppCtx) + ctx := cmd.ctx + if cmd.ent.Index == 0 { + return nil, errors.Errorf("processRaftCommand requires a non-zero index") + } + if idx, applied := cmd.ent.Index, b.state.RaftAppliedIndex; idx != applied+1 { + // If we have an out of order index, there's corruption. No sense in + // trying to update anything or running the command. Simply return. + return nil, errors.Errorf("applied index jumped from %d to %d", applied, idx) + } + if log.V(4) { + log.Infof(ctx, "processing command %x: maxLeaseIndex=%d", cmd.idKey, cmd.raftCmd.MaxLeaseIndex) + } + + // Determine whether the command should be applied to the replicated state + // machine or whether it should be rejected (and replaced by an empty command). + // This check is deterministic on all replicas, so if one replica decides to + // reject a command, all will. + if !b.r.checkShouldApplyCommand(ctx, cmd, &b.state) { + log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.forcedErr) + + // Apply an empty command. + cmd.raftCmd.ReplicatedEvalResult = storagepb.ReplicatedEvalResult{} + cmd.raftCmd.WriteBatch = nil + cmd.raftCmd.LogicalOpLog = nil + } else { + log.Event(ctx, "applying command") + } + + // Acquire the split or merge lock, if necessary. If a split or merge + // command was rejected with a below-Raft forced error then its replicated + // result was just cleared and this will be a no-op. + if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil { + log.Fatalf(ctx, "unable to acquire split lock: %s", err) + } else if splitMergeUnlock != nil { + // Set the splitMergeUnlock on the replicaAppBatch to be called + // after the batch has been applied (see replicaAppBatch.commit). + cmd.splitMergeUnlock = splitMergeUnlock + } + + // Update the batch's max timestamp. + b.maxTS.Forward(cmd.replicatedResult().Timestamp) + + // Normalize the command, accounting for past migrations. + b.migrateReplicatedResult(ctx, cmd) + + // Stage the command's write batch in the application batch. + if err := b.stageWriteBatch(ctx, cmd); err != nil { + return nil, err + } + + // Run any triggers that should occur before the batch is applied. + if err := b.runPreApplyTriggers(ctx, cmd); err != nil { + return nil, err + } + + // Stage the command's trivial ReplicatedState updates in the batch. Any + // non-trivial commands will be in their own batch, so delaying their + // non-trivial ReplicatedState updates until later (without ever staging + // them in the batch) is sufficient. + b.stageTrivialReplicatedEvalResult(ctx, cmd) + b.entries++ + if len(cmd.ent.Data) == 0 { + b.emptyEntries++ + } + + // The command was checked by checkShouldApplyCommand, so it can be returned + // as a apply.CheckedCommand. + return cmd, nil +} + +// migrateReplicatedResult performs any migrations necessary on the command to +// normalize it before applying it to the batch. This may modify the command. +func (b *replicaAppBatch) migrateReplicatedResult(ctx context.Context, cmd *cmdAppCtx) { + // If the command was using the deprecated version of the MVCCStats proto, + // migrate it to the new version and clear out the field. + res := cmd.replicatedResult() + if deprecatedDelta := res.DeprecatedDelta; deprecatedDelta != nil { + if res.Delta != (enginepb.MVCCStatsDelta{}) { + log.Fatalf(ctx, "stats delta not empty but deprecated delta provided: %+v", cmd) + } + res.Delta = deprecatedDelta.ToStatsDelta() + res.DeprecatedDelta = nil + } +} + +// stageWriteBatch applies the command's write batch to the application batch's +// RocksDB batch. This batch is committed to RocksDB in replicaAppBatch.commit. +func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *cmdAppCtx) error { + wb := cmd.raftCmd.WriteBatch + if wb == nil { + return nil + } + if mutations, err := engine.RocksDBBatchCount(wb.Data); err != nil { + log.Errorf(ctx, "unable to read header of committed WriteBatch: %+v", err) + } else { + b.mutations += mutations + } + if err := b.batch.ApplyBatchRepr(wb.Data, false); err != nil { + return errors.Wrap(err, "unable to apply WriteBatch") + } + return nil +} + +// runPreApplyTriggers runs any triggers that must fire before a command is +// applied. It may modify the command's ReplicatedEvalResult. +func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *cmdAppCtx) error { + res := cmd.replicatedResult() + + // AddSSTable ingestions run before the actual batch gets written to the + // storage engine. This makes sure that when the Raft command is applied, + // the ingestion has definitely succeeded. Note that we have taken + // precautions during command evaluation to avoid having mutations in the + // WriteBatch that affect the SSTable. Not doing so could result in order + // reversal (and missing values) here. + // + // NB: any command which has an AddSSTable is non-trivial and will be + // applied in its own batch so it's not possible that any other commands + // which precede this command can shadow writes from this SSTable. + if res.AddSSTable != nil { + copied := addSSTablePreApply( + ctx, + b.r.store.cfg.Settings, + b.r.store.engine, + b.r.raftMu.sideloaded, + cmd.ent.Term, + cmd.ent.Index, + *res.AddSSTable, + b.r.store.limiters.BulkIOWriteRate, + ) + b.r.store.metrics.AddSSTableApplications.Inc(1) + if copied { + b.r.store.metrics.AddSSTableApplicationCopies.Inc(1) + } + res.AddSSTable = nil + } + + if res.Split != nil { + // Splits require a new HardState to be written to the new RHS + // range (and this needs to be atomic with the main batch). This + // cannot be constructed at evaluation time because it differs + // on each replica (votes may have already been cast on the + // uninitialized replica). Write this new hardstate to the batch too. + // See https://github.com/cockroachdb/cockroach/issues/20629 + splitPreApply(ctx, b.batch, res.Split.SplitTrigger) + } + + if merge := res.Merge; merge != nil { + // Merges require the subsumed range to be atomically deleted when the + // merge transaction commits. + rhsRepl, err := b.r.store.GetReplica(merge.RightDesc.RangeID) + if err != nil { + log.Fatal(ctx, err) + } + const destroyData = false + if err := rhsRepl.preDestroyRaftMuLocked( + ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, destroyData, + ); err != nil { + log.Fatal(ctx, err) + } + } + + if res.State != nil && res.State.TruncatedState != nil { + if apply, err := handleTruncatedStateBelowRaft( + ctx, b.state.TruncatedState, res.State.TruncatedState, b.r.raftMu.stateLoader, b.batch, + ); err != nil { + return err + } else if !apply { + // The truncated state was discarded, so make sure we don't apply + // it to our in-memory state. + res.State.TruncatedState = nil + res.RaftLogDelta = 0 + // TODO(ajwerner): consider moving this code. + // We received a truncation that doesn't apply to us, so we know that + // there's a leaseholder out there with a log that has earlier entries + // than ours. That leader also guided our log size computations by + // giving us RaftLogDeltas for past truncations, and this was likely + // off. Mark our Raft log size is not trustworthy so that, assuming + // we step up as leader at some point in the future, we recompute + // our numbers. + b.r.mu.Lock() + b.r.mu.raftLogSizeTrusted = false + b.r.mu.Unlock() + } + } + + // Provide the command's corresponding logical operations to the Replica's + // rangefeed. Only do so if the WriteBatch is non-nil, in which case the + // rangefeed requires there to be a corresponding logical operation log or + // it will shut down with an error. If the WriteBatch is nil then we expect + // the logical operation log to also be nil. We don't want to trigger a + // shutdown of the rangefeed in that situation, so we don't pass anything to + // the rangefed. If no rangefeed is running at all, this call will be a noop. + if cmd.raftCmd.WriteBatch != nil { + b.r.handleLogicalOpLogRaftMuLocked(ctx, cmd.raftCmd.LogicalOpLog, b.batch) + } else if cmd.raftCmd.LogicalOpLog != nil { + log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) + } + return nil +} + +// stageTrivialReplicatedEvalResult applies the trivial portions of the +// command's ReplicatedEvalResult to the batch's ReplicaState. This function +// modifies the receiver's ReplicaState but does not modify ReplicatedEvalResult +// in order to give the TestingPostApplyFilter testing knob an opportunity to +// inspect the command's ReplicatedEvalResult. +func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(ctx context.Context, cmd *cmdAppCtx) { + if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 { + b.state.RaftAppliedIndex = raftAppliedIndex + } + if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 { + b.state.LeaseAppliedIndex = leaseAppliedIndex + } + res := cmd.replicatedResult() + // Special-cased MVCC stats handling to exploit commutativity of stats delta + // upgrades. Thanks to commutativity, the spanlatch manager does not have to + // serialize on the stats key. + b.state.Stats.Add(res.Delta.ToStats()) + // Exploit the fact that a split will result in a full stats + // recomputation to reset the ContainsEstimates flag. + // + // TODO(tschottdorf): We want to let the usual MVCCStats-delta + // machinery update our stats for the left-hand side. But there is no + // way to pass up an MVCCStats object that will clear out the + // ContainsEstimates flag. We should introduce one, but the migration + // makes this worth a separate effort (ContainsEstimates would need to + // have three possible values, 'UNCHANGED', 'NO', and 'YES'). + // Until then, we're left with this rather crude hack. + if res.Split != nil { + b.state.Stats.ContainsEstimates = false + } + if res.State != nil && res.State.UsingAppliedStateKey && !b.state.UsingAppliedStateKey { + b.migrateToAppliedStateKey = true + } +} + +// Commit implements the apply.Batch interface. The method handles the +// second phase of applying a command to the replica state machine. It writes +// the application batch's accumulated RocksDB batch to the storage engine. +func (b *replicaAppBatch) Commit(ctx context.Context) error { + if log.V(4) { + log.Infof(ctx, "flushing batch %v of %d entries", b.state, b.entries) + } + + // Update the node clock with the maximum timestamp of all commands in the + // batch. This maintains a high water mark for all ops serviced, so that + // received ops without a timestamp specified are guaranteed one higher than + // any op already executed for overlapping keys. + r := b.r + r.store.Clock().Update(b.maxTS) + + // Add the replica applied state key to the write batch. + if err := b.addAppliedStateKeyToBatch(ctx); err != nil { + return err + } + + // Apply the write batch to RockDB. Entry application is done without + // syncing to disk. The atomicity guarantees of the batch and the fact that + // the applied state is stored in this batch, ensure that if the batch ends + // up not being durably committed then the entries in this batch will be + // applied again upon startup. + const sync = false + if err := b.batch.Commit(sync); err != nil { + log.Fatalf(ctx, "failed to commit Raft entry batch: %v", err) + } + b.batch.Close() + b.batch = nil + + // Update the replica's applied indexes and mvcc stats. + r.mu.Lock() + r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex + r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex + prevStats := *r.mu.state.Stats + *r.mu.state.Stats = *b.state.Stats + + // Check the queuing conditions while holding the lock. + needsSplitBySize := r.needsSplitBySizeRLocked() + needsMergeBySize := r.needsMergeBySizeRLocked() + r.mu.Unlock() + + // Record the stats delta in the StoreMetrics. + deltaStats := *b.state.Stats + deltaStats.Subtract(prevStats) + r.store.metrics.addMVCCStats(deltaStats) + + // Record the write activity, passing a 0 nodeID because replica.writeStats + // intentionally doesn't track the origin of the writes. + b.r.writeStats.recordCount(float64(b.mutations), 0 /* nodeID */) + + // NB: the bootstrap store has a nil split queue. + // TODO(tbg): the above is probably a lie now. + now := timeutil.Now() + if r.store.splitQueue != nil && needsSplitBySize && r.splitQueueThrottle.ShouldProcess(now) { + r.store.splitQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + // The bootstrap store has a nil merge queue. + // TODO(tbg): the above is probably a lie now. + if r.store.mergeQueue != nil && needsMergeBySize && r.mergeQueueThrottle.ShouldProcess(now) { + // TODO(tbg): for ranges which are small but protected from merges by + // other means (zone configs etc), this is called on every command, and + // fires off a goroutine each time. Make this trigger (and potentially + // the split one above, though it hasn't been observed to be as + // bothersome) less aggressive. + r.store.mergeQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } + + b.recordStatsOnCommit() + return nil +} + +// addAppliedStateKeyToBatch adds the applied state key to the application +// batch's RocksDB batch. This records the highest raft and lease index that +// have been applied as of this batch. It also records the Range's mvcc stats. +func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { + loader := &b.r.raftMu.stateLoader + if b.migrateToAppliedStateKey { + // A Raft command wants us to begin using the RangeAppliedState key + // and we haven't performed the migration yet. Delete the old keys + // that this new key is replacing. + // + // NB: entering this branch indicates that the batch contains only a + // single non-trivial command. + err := loader.MigrateToRangeAppliedStateKey(ctx, b.batch, b.state.Stats) + if err != nil { + return errors.Wrap(err, "unable to migrate to range applied state") + } + b.state.UsingAppliedStateKey = true + } + if b.state.UsingAppliedStateKey { + // Set the range applied state, which includes the last applied raft and + // lease index along with the mvcc stats, all in one key. + if err := loader.SetRangeAppliedState( + ctx, b.batch, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, b.state.Stats, + ); err != nil { + return errors.Wrap(err, "unable to set range applied state") + } + } else { + // Advance the last applied index. We use a blind write in order to avoid + // reading the previous applied index keys on every write operation. This + // requires a little additional work in order maintain the MVCC stats. + var appliedIndexNewMS enginepb.MVCCStats + if err := loader.SetLegacyAppliedIndexBlind( + ctx, b.batch, &appliedIndexNewMS, b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex, + ); err != nil { + return errors.Wrap(err, "unable to set applied index") + } + b.state.Stats.SysBytes += appliedIndexNewMS.SysBytes - + loader.CalcAppliedIndexSysBytes(b.state.RaftAppliedIndex, b.state.LeaseAppliedIndex) + + // Set the legacy MVCC stats key. + if err := loader.SetMVCCStats(ctx, b.batch, b.state.Stats); err != nil { + return errors.Wrap(err, "unable to update MVCCStats") + } + } + return nil +} + +func (b *replicaAppBatch) recordStatsOnCommit() { + b.a.stats.entriesProcessed += b.entries + b.a.stats.numEmptyEntries += b.emptyEntries + b.a.stats.batchesProcessed++ + + elapsed := timeutil.Since(b.start) + b.r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) +} + +// Close implements the apply.Batch interface. +func (b *replicaAppBatch) Close() { + if b.batch != nil { + b.batch.Close() + } + *b = replicaAppBatch{} +} + +// ApplySideEffects implements the apply.StateMachine interface. The method +// handles the third phase of applying a command to the replica state machine. +// +// It is called with commands whose write batches have already been committed +// to the storage engine and whose trivial side-effects have been applied to +// the Replica's in-memory state. This method deals with applying non-trivial +// side effects of commands, such as finalizing splits/merges and informing +// raft about applied config changes. +func (a *replicaApplier) ApplySideEffects(cmdI apply.CheckedCommand) (apply.AppliedCommand, error) { + cmd := cmdI.(*cmdAppCtx) + ctx := cmd.ctx + + // Deal with locking during side-effect handling, which is sometimes + // associated with complex commands such as splits and merged. + if unlock := cmd.splitMergeUnlock; unlock != nil { + defer unlock() + } + if cmd.replicatedResult().BlockReads { + cmd.replicatedResult().BlockReads = false + a.r.readOnlyCmdMu.Lock() + defer a.r.readOnlyCmdMu.Unlock() + } + + // Set up the local result prior to handling the ReplicatedEvalResult to + // give testing knobs an opportunity to inspect it. + a.r.prepareLocalResult(ctx, cmd) + if log.ExpensiveLogEnabled(ctx, 2) { + log.VEvent(ctx, 2, cmd.localResult.String()) + } + + // Handle the ReplicatedEvalResult, executing any side effects of the last + // state machine transition. + // + // Note that this must happen after committing (the engine.Batch), but + // before notifying a potentially waiting client. + clearTrivialReplicatedEvalResultFields(cmd.replicatedResult()) + if !cmd.IsTrivial() { + shouldAssert := a.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) + // NB: Perform state assertion before acknowledging the client. + // Some tests (TestRangeStatsInit) assumes that once the store has started + // and the first range has a lease that there will not be a later hard-state. + if shouldAssert { + // Assert that the on-disk state doesn't diverge from the in-memory + // state as a result of the side effects. + a.r.mu.Lock() + a.r.assertStateLocked(ctx, a.r.store.Engine()) + a.r.mu.Unlock() + a.stats.stateAssertions++ + } + } else if res := cmd.replicatedResult(); !res.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", res) + } + + if cmd.replicatedResult().RaftLogDelta == 0 { + a.r.handleNoRaftLogDeltaResult(ctx) + } + if cmd.localResult != nil { + a.r.handleLocalEvalResult(ctx, *cmd.localResult) + } + if err := a.maybeApplyConfChange(ctx, cmd); err != nil { + return nil, err + } + + // Mark the command as applied and return it as an apply.AppliedCommand. + if cmd.IsLocal() { + if !cmd.Rejected() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { + log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") + } + if cmd.proposal.applied { + // If the command already applied then we shouldn't be "finishing" its + // application again because it should only be able to apply successfully + // once. We expect that when any reproposal for the same command attempts + // to apply it will be rejected by the below raft lease sequence or lease + // index check in checkForcedErr. + if !cmd.Rejected() { + log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd) + } + cmd.proposal = nil + } else { + cmd.proposal.applied = true + } + } + return cmd, nil +} + +// handleNonTrivialReplicatedEvalResult carries out the side-effects of +// non-trivial commands. It is run with the raftMu locked. It is illegal +// to pass a replicatedResult that does not imply any side-effects. +func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( + ctx context.Context, rResult storagepb.ReplicatedEvalResult, +) (shouldAssert bool) { + // Assert that this replicatedResult implies at least one side-effect. + if rResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult") + } + + if rResult.State != nil { + if rResult.State.TruncatedState != nil { + rResult.RaftLogDelta += a.r.handleTruncatedStateResult(ctx, rResult.State.TruncatedState) + rResult.State.TruncatedState = nil + } + + if (*rResult.State == storagepb.ReplicaState{}) { + rResult.State = nil + } + } + + if rResult.RaftLogDelta != 0 { + a.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta) + rResult.RaftLogDelta = 0 + } + + if rResult.SuggestedCompactions != nil { + a.r.handleSuggestedCompactionsResult(ctx, rResult.SuggestedCompactions) + rResult.SuggestedCompactions = nil + } + + // The rest of the actions are "nontrivial" and may have large effects on the + // in-memory and on-disk ReplicaStates. If any of these actions are present, + // we want to assert that these two states do not diverge. + shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) + if !shouldAssert { + return false + } + + if rResult.Split != nil { + a.r.handleSplitResult(ctx, rResult.Split) + rResult.Split = nil + } + + if rResult.Merge != nil { + a.r.handleMergeResult(ctx, rResult.Merge) + rResult.Merge = nil + } + + if rResult.State != nil { + if newDesc := rResult.State.Desc; newDesc != nil { + a.r.handleDescResult(ctx, newDesc) + rResult.State.Desc = nil + } + + if newLease := rResult.State.Lease; newLease != nil { + a.r.handleLeaseResult(ctx, newLease) + rResult.State.Lease = nil + } + + if newThresh := rResult.State.GCThreshold; newThresh != nil { + a.r.handleGCThresholdResult(ctx, newThresh) + rResult.State.GCThreshold = nil + } + + if rResult.State.UsingAppliedStateKey { + a.r.handleUsingAppliedStateKeyResult(ctx) + rResult.State.UsingAppliedStateKey = false + } + + if (*rResult.State == storagepb.ReplicaState{}) { + rResult.State = nil + } + } + + if rResult.ChangeReplicas != nil { + a.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) + rResult.ChangeReplicas = nil + } + + if rResult.ComputeChecksum != nil { + a.r.handleComputeChecksumResult(ctx, rResult.ComputeChecksum) + rResult.ComputeChecksum = nil + } + + if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { + log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) + } + return true +} + +func (a *replicaApplier) maybeApplyConfChange(ctx context.Context, cmd *cmdAppCtx) error { + switch cmd.ent.Type { + case raftpb.EntryNormal: + if cmd.replicatedResult().ChangeReplicas != nil { + log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd) + } + return nil + case raftpb.EntryConfChange: + if cmd.replicatedResult().ChangeReplicas == nil { + // The command was rejected. + cmd.cc = raftpb.ConfChange{} + } + return a.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + raftGroup.ApplyConfChange(cmd.cc) + return true, nil + }) + default: + panic("unexpected") + } +} + +func (a *replicaApplier) moveStats() applyCommittedEntriesStats { + stats := a.stats + a.stats = applyCommittedEntriesStats{} + return stats +} diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 6f7d74733963..32a61b7649be 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -15,39 +15,38 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/rditer" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/kr/pretty" - "github.com/pkg/errors" - "go.etcd.io/etcd/raft" - "go.etcd.io/etcd/raft/raftpb" ) // isTrivial determines whether the side-effects of a ReplicatedEvalResult are // "trivial". A result is fundamentally considered "trivial" if it does not have // side effects which rely on the written state of the replica exactly matching // the in-memory state of the replica at the corresponding log position. -// Non-trivial commands must be the last entry in a batch so that after -// batch is applied the replica's written and in-memory state correspond to that -// log index. +// Non-trivial commands must be applied in their own batch so that after +// the batch is applied the replica's written and in-memory state correspond +// to that log index. // // At the time of writing it is possible that the current conditions are too // strict but they are certainly sufficient. -func isTrivial(r *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool) (ret bool) { +func isTrivial(r *storagepb.ReplicatedEvalResult) bool { // Check if there are any non-trivial State updates. if r.State != nil { stateWhitelist := *r.State - // An entry is non-trivial if an upgrade to UsingAppliedState is - // required. If we're already usingAppliedStateKey or this entry does - // not imply an upgrade then it is trivial. - if usingAppliedStateKey { - stateWhitelist.UsingAppliedStateKey = false + // ReplicaState.Stats was previously non-nullable which caused nodes to + // send a zero-value MVCCStats structure. If the proposal was generated by + // an old node, we'll have decoded that zero-value structure setting + // ReplicaState.Stats to a non-nil value which would trigger the "unhandled + // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. + // TODO(ajwerner): eliminate this case that likely can no longer occur as of + // at least 19.1. + if stateWhitelist.Stats != nil && (*stateWhitelist.Stats == enginepb.MVCCStats{}) { + stateWhitelist.Stats = nil } - if stateWhitelist.TruncatedState != nil { - stateWhitelist.TruncatedState = nil + if stateWhitelist.DeprecatedTxnSpanGCThreshold != nil { + stateWhitelist.DeprecatedTxnSpanGCThreshold = nil } if stateWhitelist != (storagepb.ReplicaState{}) { return false @@ -62,244 +61,35 @@ func isTrivial(r *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool) (re whitelist.DeprecatedDelta = nil whitelist.PrevLeaseProposal = nil whitelist.State = nil - whitelist.RaftLogDelta = 0 - whitelist.SuggestedCompactions = nil return whitelist.Equal(storagepb.ReplicatedEvalResult{}) } -// stageTrivialReplicatedEvalResult applies the trivial portions of replicatedResult to -// the supplied replicaState and returns whether the change implied an update to -// the replica's truncated state. This function modifies replicaState but does -// not modify replicatedResult in order to give the TestingPostApplyFilter testing knob -// an opportunity to inspect the command's ReplicatedEvalResult. -func stageTrivialReplicatedEvalResult( - ctx context.Context, - replicatedResult *storagepb.ReplicatedEvalResult, - raftAppliedIndex, leaseAppliedIndex uint64, - replicaState *storagepb.ReplicaState, -) (truncatedStateUpdated bool) { - deltaStats := replicatedResult.Delta.ToStats() - replicaState.Stats.Add(deltaStats) - if raftAppliedIndex != 0 { - replicaState.RaftAppliedIndex = raftAppliedIndex - } - if leaseAppliedIndex != 0 { - replicaState.LeaseAppliedIndex = leaseAppliedIndex - } - haveState := replicatedResult.State != nil - truncatedStateUpdated = haveState && replicatedResult.State.TruncatedState != nil - if truncatedStateUpdated { - replicaState.TruncatedState = replicatedResult.State.TruncatedState - } - return truncatedStateUpdated -} - // clearTrivialReplicatedEvalResultFields is used to zero out the fields of a // ReplicatedEvalResult that have already been consumed when staging the // corresponding command and applying it to the current batch's view of the // ReplicaState. This function is called after a batch has been written to the // storage engine. For trivial commands this function should result in a zero // value replicatedResult. -func clearTrivialReplicatedEvalResultFields( - replicatedResult *storagepb.ReplicatedEvalResult, usingAppliedStateKey bool, -) { +func clearTrivialReplicatedEvalResultFields(r *storagepb.ReplicatedEvalResult) { // Fields for which no action is taken in this method are zeroed so that // they don't trigger an assertion at the end of the application process // (which checks that all fields were handled). - replicatedResult.IsLeaseRequest = false - replicatedResult.Timestamp = hlc.Timestamp{} - replicatedResult.PrevLeaseProposal = nil + r.IsLeaseRequest = false + r.Timestamp = hlc.Timestamp{} + r.PrevLeaseProposal = nil // The state fields cleared here were already applied to the in-memory view of // replica state for this batch. - if haveState := replicatedResult.State != nil; haveState { - replicatedResult.State.Stats = nil - replicatedResult.State.TruncatedState = nil + if haveState := r.State != nil; haveState { + r.State.Stats = nil // Strip the DeprecatedTxnSpanGCThreshold. We don't care about it. // TODO(nvanbenschoten): Remove in 20.1. - replicatedResult.State.DeprecatedTxnSpanGCThreshold = nil - - // If we're already using the AppliedStateKey then there's nothing - // to do. This flag is idempotent so it's ok that we see this flag - // multiple times, but we want to make sure it doesn't cause us to - // perform repeated state assertions, so clear it before the - // shouldAssert determination. - // A reader might wonder if using the value of usingAppliedState key from - // after applying an entire batch is valid to determine whether this command - // implied a transition, but if it had implied a transition then the batch - // would not have been considered trivial and the current view of will still - // be false as complex state transitions are handled after this call. - if usingAppliedStateKey { - replicatedResult.State.UsingAppliedStateKey = false - } - // ReplicaState.Stats was previously non-nullable which caused nodes to - // send a zero-value MVCCStats structure. If the proposal was generated by - // an old node, we'll have decoded that zero-value structure setting - // ReplicaState.Stats to a non-nil value which would trigger the "unhandled - // field in ReplicatedEvalResult" assertion to fire if we didn't clear it. - // TODO(ajwerner): eliminate this case that likely can no longer occur as of - // at least 19.1. - if replicatedResult.State.Stats != nil && (*replicatedResult.State.Stats == enginepb.MVCCStats{}) { - replicatedResult.State.Stats = nil - } - if *replicatedResult.State == (storagepb.ReplicaState{}) { - replicatedResult.State = nil - } - } - replicatedResult.Delta = enginepb.MVCCStatsDelta{} -} - -// handleRaftCommandResult is called after the current cmd's batch has been -// committed to the storage engine and the trivial side-effects have been -// applied to the Replica's in-memory state. This method deals with applying -// non-trivial side effects, notifying waiting clients, releasing latches, -// informing raft about applied config changes, and asserting hard state as -// required. -func (r *Replica) handleRaftCommandResult( - ctx context.Context, cmd *cmdAppCtx, isNonTrivial, usingAppliedStateKey bool, -) (errExpl string, err error) { - // Set up the local result prior to handling the ReplicatedEvalResult to - // give testing knobs an opportunity to inspect it. - r.prepareLocalResult(ctx, cmd) - if log.ExpensiveLogEnabled(ctx, 2) { - log.VEvent(ctx, 2, cmd.localResult.String()) - } - - // Handle the ReplicatedEvalResult, executing any side effects of the last - // state machine transition. - // - // Note that this must happen after committing (the engine.Batch), but - // before notifying a potentially waiting client. - clearTrivialReplicatedEvalResultFields(cmd.replicatedResult(), usingAppliedStateKey) - if isNonTrivial { - r.handleComplexReplicatedEvalResult(ctx, *cmd.replicatedResult()) - } else if !cmd.replicatedResult().Equal(storagepb.ReplicatedEvalResult{}) { - log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", - cmd.replicatedResult()) - } - - // NB: Perform state assertion before acknowledging the client. - // Some tests (TestRangeStatsInit) assumes that once the store has started - // and the first range has a lease that there will not be a later hard-state. - if isNonTrivial { - // Assert that the on-disk state doesn't diverge from the in-memory - // state as a result of the side effects. - r.mu.Lock() - r.assertStateLocked(ctx, r.store.Engine()) - r.mu.Unlock() - } - - if cmd.localResult != nil { - r.handleLocalEvalResult(ctx, *cmd.localResult) - } - r.finishRaftCommand(ctx, cmd) - switch cmd.e.Type { - case raftpb.EntryNormal: - if cmd.replicatedResult().ChangeReplicas != nil { - log.Fatalf(ctx, "unexpected replication change from command %s", &cmd.raftCmd) - } - case raftpb.EntryConfChange: - if cmd.replicatedResult().ChangeReplicas == nil { - cmd.cc = raftpb.ConfChange{} - } - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { - raftGroup.ApplyConfChange(cmd.cc) - return true, nil - }); err != nil { - const errExpl = "during ApplyConfChange" - return errExpl, errors.Wrap(err, errExpl) - } - } - return "", nil -} - -// handleComplexReplicatedEvalResult carries out the side-effects of non-trivial -// commands. It is run with the raftMu locked. It is illegal to pass a -// replicatedResult that does not imply any side-effects. -func (r *Replica) handleComplexReplicatedEvalResult( - ctx context.Context, replicatedResult storagepb.ReplicatedEvalResult, -) { - - // Assert that this replicatedResult implies at least one side-effect. - if replicatedResult.Equal(storagepb.ReplicatedEvalResult{}) { - log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleComplexReplicatedEvalResult") - } - - // Process Split or Merge. This needs to happen after stats update because - // of the ContainsEstimates hack. - if replicatedResult.Split != nil { - splitPostApply( - r.AnnotateCtx(ctx), - replicatedResult.Split.RHSDelta, - &replicatedResult.Split.SplitTrigger, - r, - ) - replicatedResult.Split = nil - } - - if replicatedResult.Merge != nil { - if err := r.store.MergeRange( - ctx, r, replicatedResult.Merge.LeftDesc, replicatedResult.Merge.RightDesc, replicatedResult.Merge.FreezeStart, - ); err != nil { - // Our in-memory state has diverged from the on-disk state. - log.Fatalf(ctx, "failed to update store after merging range: %s", err) - } - replicatedResult.Merge = nil - } - - // Update the remaining ReplicaState. - if replicatedResult.State != nil { - if newDesc := replicatedResult.State.Desc; newDesc != nil { - r.setDesc(ctx, newDesc) - replicatedResult.State.Desc = nil + r.State.DeprecatedTxnSpanGCThreshold = nil + if *r.State == (storagepb.ReplicaState{}) { + r.State = nil } - - if newLease := replicatedResult.State.Lease; newLease != nil { - r.leasePostApply(ctx, *newLease, false /* permitJump */) - replicatedResult.State.Lease = nil - } - - if newThresh := replicatedResult.State.GCThreshold; newThresh != nil { - if (*newThresh != hlc.Timestamp{}) { - r.mu.Lock() - r.mu.state.GCThreshold = newThresh - r.mu.Unlock() - } - replicatedResult.State.GCThreshold = nil - } - - if replicatedResult.State.UsingAppliedStateKey { - r.mu.Lock() - r.mu.state.UsingAppliedStateKey = true - r.mu.Unlock() - replicatedResult.State.UsingAppliedStateKey = false - } - - if (*replicatedResult.State == storagepb.ReplicaState{}) { - replicatedResult.State = nil - } - } - - if change := replicatedResult.ChangeReplicas; change != nil { - if change.ChangeType == roachpb.REMOVE_REPLICA && - r.store.StoreID() == change.Replica.StoreID { - // This wants to run as late as possible, maximizing the chances - // that the other nodes have finished this command as well (since - // processing the removal from the queue looks up the Range at the - // lease holder, being too early here turns this into a no-op). - r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) - } - replicatedResult.ChangeReplicas = nil - } - - if replicatedResult.ComputeChecksum != nil { - r.computeChecksumPostApply(ctx, *replicatedResult.ComputeChecksum) - replicatedResult.ComputeChecksum = nil - } - - if !replicatedResult.Equal(storagepb.ReplicatedEvalResult{}) { - log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(replicatedResult, storagepb.ReplicatedEvalResult{})) } + r.Delta = enginepb.MVCCStatsDelta{} } // prepareLocalResult is performed after the command has been committed to the @@ -311,7 +101,7 @@ func (r *Replica) handleComplexReplicatedEvalResult( // result will zero-out the struct to ensure that is has properly performed all // of the implied side-effects. func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { - if !cmd.proposedLocally() { + if !cmd.IsLocal() { return } @@ -422,46 +212,123 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( return nil } -// finishRaftCommand is called after a command's side effects have been applied -// in order to acknowledge clients and release latches. -func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) { - // When set to true, recomputes the stats for the LHS and RHS of splits and - // makes sure that they agree with the state's range stats. - const expensiveSplitAssertion = false +// Replica.handleXYZResult methods are called by replicaApplier.ApplySideEffects +// when applying non-trivial side effects. As a general rule, there is a method +// for each of the non-trivial fields in ReplicatedEvalResult. Most methods are +// simple enough that they will be inlined. - if expensiveSplitAssertion && cmd.replicatedResult().Split != nil { - split := cmd.replicatedResult().Split - lhsStatsMS := r.GetMVCCStats() - lhsComputedMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, r.store.Engine(), lhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } +func (r *Replica) handleSplitResult(ctx context.Context, split *storagepb.Split) { + splitPostApply(ctx, split.RHSDelta, &split.SplitTrigger, r) +} - rightReplica, err := r.store.GetReplica(split.RightDesc.RangeID) - if err != nil { - log.Fatal(ctx, err) - } +func (r *Replica) handleMergeResult(ctx context.Context, merge *storagepb.Merge) { + if err := r.store.MergeRange( + ctx, r, merge.LeftDesc, merge.RightDesc, merge.FreezeStart, + ); err != nil { + // Our in-memory state has diverged from the on-disk state. + log.Fatalf(ctx, "failed to update store after merging range: %s", err) + } +} - rhsStatsMS := rightReplica.GetMVCCStats() - rhsComputedMS, err := rditer.ComputeStatsForRange(&split.RightDesc, r.store.Engine(), rhsStatsMS.LastUpdateNanos) - if err != nil { - log.Fatal(ctx, err) - } +func (r *Replica) handleDescResult(ctx context.Context, desc *roachpb.RangeDescriptor) { + r.setDesc(ctx, desc) +} - if diff := pretty.Diff(lhsStatsMS, lhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "LHS split stats divergence: diff(claimed, computed) = %s", pretty.Diff(lhsStatsMS, lhsComputedMS)) - } - if diff := pretty.Diff(rhsStatsMS, rhsComputedMS); len(diff) > 0 { - log.Fatalf(ctx, "RHS split stats divergence diff(claimed, computed) = %s", pretty.Diff(rhsStatsMS, rhsComputedMS)) - } +func (r *Replica) handleLeaseResult(ctx context.Context, lease *roachpb.Lease) { + r.leasePostApply(ctx, *lease, false /* permitJump */) +} + +func (r *Replica) handleTruncatedStateResult( + ctx context.Context, t *roachpb.RaftTruncatedState, +) (raftLogDelta int64) { + r.mu.Lock() + r.mu.state.TruncatedState = t + r.mu.Unlock() + + // Clear any entries in the Raft log entry cache for this range up + // to and including the most recently truncated index. + r.store.raftEntryCache.Clear(r.RangeID, t.Index+1) + + // Truncate the sideloaded storage. Note that this is safe only if the new truncated state + // is durably on disk (i.e.) synced. This is true at the time of writing but unfortunately + // could rot. + log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", t.Index) + size, _, err := r.raftMu.sideloaded.TruncateTo(ctx, t.Index+1) + if err != nil { + // We don't *have* to remove these entries for correctness. Log a + // loud error, but keep humming along. + log.Errorf(ctx, "while removing sideloaded files during log truncation: %+v", err) } + return -size +} - if cmd.proposedLocally() { - if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex { - log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index") - } - cmd.proposal.finishApplication(cmd.response) - } else if cmd.response.Err != nil { - log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err) +func (r *Replica) handleGCThresholdResult(ctx context.Context, thresh *hlc.Timestamp) { + if thresh.IsEmpty() { + return + } + r.mu.Lock() + r.mu.state.GCThreshold = thresh + r.mu.Unlock() +} + +func (r *Replica) handleUsingAppliedStateKeyResult(ctx context.Context) { + r.mu.Lock() + r.mu.state.UsingAppliedStateKey = true + r.mu.Unlock() +} + +func (r *Replica) handleComputeChecksumResult(ctx context.Context, cc *storagepb.ComputeChecksum) { + r.computeChecksumPostApply(ctx, *cc) +} + +func (r *Replica) handleChangeReplicasResult(ctx context.Context, chng *storagepb.ChangeReplicas) { + if chng.ChangeType == roachpb.REMOVE_REPLICA && r.store.StoreID() == chng.Replica.StoreID { + // This wants to run as late as possible, maximizing the chances + // that the other nodes have finished this command as well (since + // processing the removal from the queue looks up the Range at the + // lease holder, being too early here turns this into a no-op). + r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) + } +} + +func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64) { + r.mu.Lock() + defer r.mu.Unlock() + r.mu.raftLogSize += delta + r.mu.raftLogLastCheckSize += delta + // Ensure raftLog{,LastCheck}Size is not negative since it isn't persisted + // between server restarts. + if r.mu.raftLogSize < 0 { + r.mu.raftLogSize = 0 + } + if r.mu.raftLogLastCheckSize < 0 { + r.mu.raftLogLastCheckSize = 0 + } +} + +func (r *Replica) handleNoRaftLogDeltaResult(ctx context.Context) { + // Check for whether to queue the range for Raft log truncation if this is + // not a Raft log truncation command itself. We don't want to check the + // Raft log for truncation on every write operation or even every operation + // which occurs after the Raft log exceeds RaftLogQueueStaleSize. The logic + // below queues the replica for possible Raft log truncation whenever an + // additional RaftLogQueueStaleSize bytes have been written to the Raft + // log. + r.mu.Lock() + checkRaftLog := r.mu.raftLogSize-r.mu.raftLogLastCheckSize >= RaftLogQueueStaleSize + if checkRaftLog { + r.mu.raftLogLastCheckSize = r.mu.raftLogSize + } + r.mu.Unlock() + if checkRaftLog { + r.store.raftLogQueue.MaybeAddAsync(ctx, r, r.store.Clock().Now()) + } +} + +func (r *Replica) handleSuggestedCompactionsResult( + ctx context.Context, scs []storagepb.SuggestedCompaction, +) { + for _, sc := range scs { + r.store.compactor.Suggest(ctx, sc) } } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 02a7269a1adc..75b5bb4032f2 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -127,19 +127,6 @@ type ProposalData struct { // is canceled, it won't be listening to this done channel, and so it can't be // counted on to invoke endCmds itself.) func (proposal *ProposalData) finishApplication(pr proposalResult) { - if proposal.applied { - // If the command already applied then we shouldn't be "finishing" its - // application again because it should only be able to apply successfully - // once. We expect that when any reproposal for the same command attempts - // to apply it will be rejected by the below raft lease sequence or lease - // index check in checkForcedErr. - if pr.Err != nil { - return - } - log.Fatalf(proposal.ctx, - "command already applied: %+v; unexpected successful result: %+v", proposal, pr) - } - proposal.applied = true proposal.ec.done(proposal.Request, pr.Reply, pr.Err) proposal.signalProposalResult(pr) if proposal.sp != nil { @@ -161,7 +148,8 @@ func (proposal *ProposalData) signalProposalResult(pr proposalResult) { // TODO(tschottdorf): we should find new homes for the checksum, lease // code, and various others below to leave here only the core logic. -// Not moving anything right now to avoid awkward diffs. +// Not moving anything right now to avoid awkward diffs. These should +// all be moved to replica_application_result.go. func (r *Replica) gcOldChecksumEntriesLocked(now time.Time) { for id, val := range r.mu.checksums { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 1fe2b8c5b303..348f6c080179 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/apply" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" @@ -402,7 +403,7 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { } type handleRaftReadyStats struct { - handleCommittedEntriesStats + applyCommittedEntriesStats } // noSnap can be passed to handleRaftReady when no snapshot should be processed. @@ -717,12 +718,21 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - var expl string - stats.handleCommittedEntriesStats, expl, err = - r.handleCommittedEntriesRaftMuLocked(ctx, rd.CommittedEntries) - if err != nil { - return stats, expl, err + app := r.getApplier() + dec := r.getDecoder() + appTask := apply.MakeTask(app, dec) + appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) + defer appTask.Close() + if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { + // TODO(WIP): Propagate errExpl. + return stats, "", err } + if err := appTask.ApplyCommittedEntries(ctx); err != nil { + // TODO(WIP): Propagate errExpl. + return stats, "", err + } + stats.applyCommittedEntriesStats = app.moveStats() + // etcd raft occasionally adds a nil entry (our own commands are never // empty). This happens in two situations: When a new leader is elected, and // when a config change is dropped due to the "one at a time" rule. In both diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 0ecc043db46f..a67f2b44dd13 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -905,7 +905,7 @@ func TestLeaseReplicaNotInDesc(t *testing.T) { } tc.repl.mu.Lock() _, _, pErr := checkForcedErr( - context.Background(), makeIDKey(), raftCmd, nil /* proposal */, false, /* proposedLocally */ + context.Background(), makeIDKey(), &raftCmd, false, /* isLocal */ &tc.repl.mu.state, ) tc.repl.mu.Unlock() @@ -11586,6 +11586,9 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { defer stopper.Stop(ctx) tc.manualClock = hlc.NewManualClock(123) cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond)) + // Set the RaftMaxCommittedSizePerReady so that only a single raft entry is + // applied at a time, which makes it easier to line up the timing of reproposals. + cfg.RaftMaxCommittedSizePerReady = 1 // Set up tracing. tracer := tracing.NewTracer() tracer.Configure(&cfg.Settings.SV) @@ -11603,7 +11606,6 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) { // seen is used to detect the first application of our proposal. var seen bool cfg.TestingKnobs = StoreTestingKnobs{ - MaxApplicationBatchSize: 1, // Set the TestingProposalFilter in order to know the CmdIDKey for our // request by detecting its txnID. TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error { diff --git a/pkg/storage/track_raft_protos.go b/pkg/storage/track_raft_protos.go index 23c1028bd867..24cc6bcf30c9 100644 --- a/pkg/storage/track_raft_protos.go +++ b/pkg/storage/track_raft_protos.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/apply" "github.com/cockroachdb/cockroach/pkg/storage/compactor" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -33,8 +34,8 @@ func funcName(f interface{}) string { // marshaled downstream of raft. It returns a function that removes the // instrumentation and returns the list of downstream-of-raft protos. func TrackRaftProtos() func() []reflect.Type { - // Grab the name of the function that roots all raft operations. - stageRaftFunc := funcName((*Replica).stageRaftCommand) + // Grab the name of the function that roots all raft application. + applyRaftEntryFunc := funcName((*apply.Task).ApplyCommittedEntries) // We only need to track protos that could cause replica divergence // by being written to disk downstream of raft. whitelist := []string{ @@ -104,7 +105,7 @@ func TrackRaftProtos() func() []reflect.Type { break } - if strings.Contains(f.Function, stageRaftFunc) { + if strings.Contains(f.Function, applyRaftEntryFunc) { belowRaftProtos.Lock() belowRaftProtos.inner[t] = struct{}{} belowRaftProtos.Unlock()