From 54a2da7dbddf6d15a3df73e27dddf0974ad69bc8 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 6 Aug 2019 18:02:03 -0400 Subject: [PATCH] storage: rename replica application files and objects This isn't being squashed into the previous diff to keep the diffs there more clear. The renames include: Types: `replicaApplier` -> `replicaStateMachine` `cmdAppCtx` -> `replicatedCmd` `cmdAppCtxBuf` -> `replicatedCmdBuf` Files: `replica_application_impl.go` -> `replica_application_state_machine.go` `cmd_app_ctx.go` -> `replica_application_cmd.go` `cmd_app_ctx_buf.go` -> `replica_application_cmd_buf.go` Release note: None --- pkg/storage/cmd_app_ctx_buf.go | 214 ---------------- pkg/storage/replica.go | 4 +- ..._app_ctx.go => replica_application_cmd.go} | 37 ++- pkg/storage/replica_application_cmd_buf.go | 229 ++++++++++++++++++ ...go => replica_application_cmd_buf_test.go} | 16 +- pkg/storage/replica_application_decoder.go | 15 +- pkg/storage/replica_application_result.go | 24 +- ...o => replica_application_state_machine.go} | 121 ++++----- pkg/storage/replica_raft.go | 6 +- 9 files changed, 355 insertions(+), 311 deletions(-) delete mode 100644 pkg/storage/cmd_app_ctx_buf.go rename pkg/storage/{cmd_app_ctx.go => replica_application_cmd.go} (83%) create mode 100644 pkg/storage/replica_application_cmd_buf.go rename pkg/storage/{cmd_app_ctx_buf_test.go => replica_application_cmd_buf_test.go} (76%) rename pkg/storage/{replica_application_impl.go => replica_application_state_machine.go} (92%) diff --git a/pkg/storage/cmd_app_ctx_buf.go b/pkg/storage/cmd_app_ctx_buf.go deleted file mode 100644 index d49e2afc5bca..000000000000 --- a/pkg/storage/cmd_app_ctx_buf.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright 2019 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package storage - -import ( - "sync" - - "github.com/cockroachdb/cockroach/pkg/storage/apply" -) - -// cmdAppCtxBufNodeSize is the size of the arrays in an cmdAppStateBufNode. -// TODO(ajwerner): justify this number. -const cmdAppCtxBufNodeSize = 8 - -// cmdAppCtxBufSliceFreeListSize is the size of the cmdAppCtxBufSlice free -// list. The size has been tuned for the maximum number of iterators that -// the storage/apply package uses at once. If this size is insufficient -// then the freelist will panic to ensure that regressions are loud. -const cmdAppCtxBufSliceFreeListSize = 3 - -// cmdAppCtxBuf is an allocation-efficient buffer used during the -// application of raft entries. Initialization occurs lazily upon the first -// call to allocate but used cmdAppCtxBuf objects should be released -// explicitly with the clear() method to release the allocated buffers back -// to the pool. -type cmdAppCtxBuf struct { - len int32 - head, tail *cmdAppCtxBufNode - free cmdAppCtxBufSliceFreeList -} - -// cmdAppCtxBufNode is a linked-list element in an cmdAppStateBuf. -type cmdAppCtxBufNode struct { - len int32 - buf [cmdAppCtxBufNodeSize]cmdAppCtx - next *cmdAppCtxBufNode -} - -var cmdAppStateBufNodeSyncPool = sync.Pool{ - New: func() interface{} { return new(cmdAppCtxBufNode) }, -} - -// allocate extends the length of buf by one and returns the newly -// added element. If this is the first call to allocate it will initialize buf. -// After a buf is initialized it should be explicitly destroyed. -func (buf *cmdAppCtxBuf) allocate() *cmdAppCtx { - if buf.tail == nil { // lazy initialization - n := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) - buf.head, buf.tail = n, n - } - if buf.tail.len == cmdAppCtxBufNodeSize { - newTail := cmdAppStateBufNodeSyncPool.Get().(*cmdAppCtxBufNode) - buf.tail.next = newTail - buf.tail = newTail - } - ret := &buf.tail.buf[buf.tail.len] - buf.tail.len++ - buf.len++ - return ret -} - -// truncate clears all of the entries currently in a buffer and returns any -// allocated buffers to the pool. -func (buf *cmdAppCtxBuf) clear() { - for buf.head != nil { - buf.len -= buf.head.len - oldHead := buf.head - newHead := oldHead.next - buf.head = newHead - *oldHead = cmdAppCtxBufNode{} - cmdAppStateBufNodeSyncPool.Put(oldHead) - } - *buf = cmdAppCtxBuf{} -} - -// newIter returns a pointer to a new uninitialized iterator. The iterator -// should be closed when no longer in use. -func (buf *cmdAppCtxBuf) newIter() *cmdAppCtxBufSlice { - return buf.free.get() -} - -// cmdAppCtxBufPtr is a pointer into a cmdAppCtxBuf. -type cmdAppCtxBufPtr struct { - idx int32 - buf *cmdAppCtxBuf - node *cmdAppCtxBufNode -} - -func (ptr *cmdAppCtxBufPtr) valid() bool { - return ptr.idx < ptr.buf.len -} - -func (ptr *cmdAppCtxBufPtr) cur() *cmdAppCtx { - return &ptr.node.buf[ptr.idx%cmdAppCtxBufNodeSize] -} - -func (ptr *cmdAppCtxBufPtr) next() { - ptr.idx++ - if !ptr.valid() { - return - } - if ptr.idx%cmdAppCtxBufNodeSize == 0 { - ptr.node = ptr.node.next - } -} - -// cmdAppCtxBufSlice iterates through the entries in an cmdAppStateBuf. -type cmdAppCtxBufSlice struct { - head, tail cmdAppCtxBufPtr -} - -// init initializes the slice over the entries cmdAppCtxBuf. -func (it *cmdAppCtxBufSlice) init(buf *cmdAppCtxBuf) { - *it = cmdAppCtxBufSlice{ - head: cmdAppCtxBufPtr{idx: 0, buf: buf, node: buf.head}, - tail: cmdAppCtxBufPtr{idx: buf.len, buf: buf, node: buf.tail}, - } -} - -// initEmpty initializes the slice with a length of 0 and pointing at -// the head of the cmdAppCtxBuf. -func (it *cmdAppCtxBufSlice) initEmpty(buf *cmdAppCtxBuf) { - *it = cmdAppCtxBufSlice{ - head: cmdAppCtxBufPtr{idx: 0, buf: buf, node: buf.head}, - tail: cmdAppCtxBufPtr{idx: 0, buf: buf, node: buf.head}, - } -} - -// len returns the length of the slice. -func (it *cmdAppCtxBufSlice) len() int { - return int(it.tail.idx - it.head.idx) -} - -// Valid implements the apply.CommandIteratorBase interface. -func (it *cmdAppCtxBufSlice) Valid() bool { - return it.len() > 0 -} - -// Next implements the apply.CommandIteratorBase interface. -func (it *cmdAppCtxBufSlice) Next() { - it.head.next() -} - -// cur and its variants implement the apply.{Checked,Applied}CommandIterator interface. -func (it *cmdAppCtxBufSlice) cur() *cmdAppCtx { return it.head.cur() } -func (it *cmdAppCtxBufSlice) Cur() apply.Command { return it.head.cur() } -func (it *cmdAppCtxBufSlice) CurChecked() apply.CheckedCommand { return it.head.cur() } -func (it *cmdAppCtxBufSlice) CurApplied() apply.AppliedCommand { return it.head.cur() } - -// append and its variants implement the apply.{Checked,Applied}CommandList interface. -func (it *cmdAppCtxBufSlice) append(cmd *cmdAppCtx) { - cur := it.tail.cur() - if cur == cmd { - // Avoid the copy. - } else { - *cur = *cmd - } - it.tail.next() -} -func (it *cmdAppCtxBufSlice) Append(cmd apply.Command) { it.append(cmd.(*cmdAppCtx)) } -func (it *cmdAppCtxBufSlice) AppendChecked(cmd apply.CheckedCommand) { it.append(cmd.(*cmdAppCtx)) } -func (it *cmdAppCtxBufSlice) AppendApplied(cmd apply.AppliedCommand) { it.append(cmd.(*cmdAppCtx)) } - -// newList and its variants implement the apply.{Checked}CommandIterator interface. -func (it *cmdAppCtxBufSlice) newList() *cmdAppCtxBufSlice { - it2 := it.head.buf.newIter() - it2.initEmpty(it.head.buf) - return it2 -} -func (it *cmdAppCtxBufSlice) NewList() apply.CommandList { return it.newList() } -func (it *cmdAppCtxBufSlice) NewCheckedList() apply.CheckedCommandList { return it.newList() } -func (it *cmdAppCtxBufSlice) NewAppliedList() apply.AppliedCommandList { return it.newList() } - -// Close implements the apply.CommandIteratorBase interface. -func (it *cmdAppCtxBufSlice) Close() { - it.head.buf.free.put(it) -} - -// cmdAppCtxBufSliceFreeList is a free list of cmdAppCtxBufSlice objects that is -// used to avoid memory allocations for short-lived cmdAppCtxBufSlice objects -// that require heap allocation. -type cmdAppCtxBufSliceFreeList struct { - iters [cmdAppCtxBufSliceFreeListSize]cmdAppCtxBufSlice - inUse [cmdAppCtxBufSliceFreeListSize]bool -} - -func (f *cmdAppCtxBufSliceFreeList) put(it *cmdAppCtxBufSlice) { - *it = cmdAppCtxBufSlice{} - for i := range f.iters { - if &f.iters[i] == it { - f.inUse[i] = false - return - } - } -} - -func (f *cmdAppCtxBufSliceFreeList) get() *cmdAppCtxBufSlice { - for i, inUse := range f.inUse { - if !inUse { - f.inUse[i] = true - return &f.iters[i] - } - } - panic("cmdAppCtxBufSliceFreeList has no free elements. Is cmdAppCtxBufSliceFreeListSize " + - "tuned properly? Are we leaking iterators by not calling Close?") -} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 1dc3f060adaa..9ac2ac0dbfbd 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -196,10 +196,10 @@ type Replica struct { stateLoader stateloader.StateLoader // on-disk storage for sideloaded SSTables. nil when there's no ReplicaID. sideloaded SideloadStorage + // stateMachine is used to apply committed raft entries. + stateMachine replicaStateMachine // decoder is used to decode committed raft entries. decoder replicaDecoder - // applier is used to apply committed raft entries. - applier replicaApplier } // Contains the lease history when enabled. diff --git a/pkg/storage/cmd_app_ctx.go b/pkg/storage/replica_application_cmd.go similarity index 83% rename from pkg/storage/cmd_app_ctx.go rename to pkg/storage/replica_application_cmd.go index e097eaff1090..a23162d599f9 100644 --- a/pkg/storage/cmd_app_ctx.go +++ b/pkg/storage/replica_application_cmd.go @@ -22,7 +22,18 @@ import ( "go.etcd.io/etcd/raft/raftpb" ) -// cmdAppCtx stores the state required to apply a single raft entry to a +// replica_application_*.go files provide concrete implementations of +// the interfaces defined in the storage/apply package: +// +// replica_application_state_machine.go -> apply.StateMachine +// replica_application_decoder.go -> apply.Decoder +// replica_application_cmd.go -> apply.Command (and variants) +// replica_application_cmd_buf.go -> apply.CommandIterator (and variants) +// replica_application_cmd_buf.go -> apply.CommandList (and variants) +// +// These allow Replica to interface with the storage/apply package. + +// replicatedCmd stores the state required to apply a single raft entry to a // replica. The state is accumulated in stages which occur in apply.Task. From // a high level, the command is decoded from a committed raft entry, then if it // was proposed locally the proposal is populated from the replica's proposals @@ -30,7 +41,7 @@ import ( // to the batch's engine.Batch and applying its "trivial" side-effects to the // batch's view of ReplicaState. Then the batch is committed, the side-effects // are applied and the local result is processed. -type cmdAppCtx struct { +type replicatedCmd struct { ent *raftpb.Entry // the raft.Entry being applied decodedRaftEntry // decoded from ent @@ -73,38 +84,34 @@ type decodedConfChange struct { ccCtx ConfChangeContext } -// decode decodes the entry e into the cmdAppCtx. -func (c *cmdAppCtx) decode(ctx context.Context, e *raftpb.Entry) error { +// decode decodes the entry e into the replicatedCmd. +func (c *replicatedCmd) decode(ctx context.Context, e *raftpb.Entry) error { c.ent = e return c.decodedRaftEntry.decode(ctx, e) } -func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { - return &d.raftCmd.ReplicatedEvalResult -} - // Index implements the apply.Command interface. -func (c *cmdAppCtx) Index() uint64 { +func (c *replicatedCmd) Index() uint64 { return c.ent.Index } // IsTrivial implements the apply.Command interface. -func (c *cmdAppCtx) IsTrivial() bool { +func (c *replicatedCmd) IsTrivial() bool { return isTrivial(c.replicatedResult()) } // IsLocal implements the apply.Command interface. -func (c *cmdAppCtx) IsLocal() bool { +func (c *replicatedCmd) IsLocal() bool { return c.proposal != nil } // Rejected implements the apply.CheckedCommand interface. -func (c *cmdAppCtx) Rejected() bool { +func (c *replicatedCmd) Rejected() bool { return c.forcedErr != nil } // FinishAndAckOutcome implements the apply.AppliedCommand interface. -func (c *cmdAppCtx) FinishAndAckOutcome() error { +func (c *replicatedCmd) FinishAndAckOutcome() error { if !c.IsLocal() { return nil } @@ -158,3 +165,7 @@ func (d *decodedRaftEntry) decodeConfChangeEntry(e *raftpb.Entry) error { d.idKey = storagebase.CmdIDKey(d.ccCtx.CommandID) return nil } + +func (d *decodedRaftEntry) replicatedResult() *storagepb.ReplicatedEvalResult { + return &d.raftCmd.ReplicatedEvalResult +} diff --git a/pkg/storage/replica_application_cmd_buf.go b/pkg/storage/replica_application_cmd_buf.go new file mode 100644 index 000000000000..5e0c8ae51815 --- /dev/null +++ b/pkg/storage/replica_application_cmd_buf.go @@ -0,0 +1,229 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "sync" + + "github.com/cockroachdb/cockroach/pkg/storage/apply" +) + +// replica_application_*.go files provide concrete implementations of +// the interfaces defined in the storage/apply package: +// +// replica_application_state_machine.go -> apply.StateMachine +// replica_application_decoder.go -> apply.Decoder +// replica_application_cmd.go -> apply.Command (and variants) +// replica_application_cmd_buf.go -> apply.CommandIterator (and variants) +// replica_application_cmd_buf.go -> apply.CommandList (and variants) +// +// These allow Replica to interface with the storage/apply package. + +// replicatedCmdBufNodeSize is the size of arrays in an replicatedCmdBufBufNode. +const replicatedCmdBufNodeSize = 8 + +// replicatedCmdBufSliceFreeListSize is the size of the replicatedCmdBufSlice +// free list. The size has been tuned for the maximum number of iterators that +// the storage/apply package uses at once. If this size is insufficient then the +// freelist will panic to ensure that regressions are loud. +const replicatedCmdBufSliceFreeListSize = 3 + +// replicatedCmdBuf is an allocation-efficient buffer used during the +// application of raft entries. Initialization occurs lazily upon the first +// call to allocate but used replicatedCmdBuf objects should be released +// explicitly with the clear() method to release the allocated buffers back +// to the pool. +type replicatedCmdBuf struct { + len int32 + head, tail *replicatedCmdBufNode + free replicatedCmdBufSliceFreeList +} + +// replicatedCmdBufNode is a linked-list element in an replicatedCmdBufBuf. +type replicatedCmdBufNode struct { + len int32 + buf [replicatedCmdBufNodeSize]replicatedCmd + next *replicatedCmdBufNode +} + +var replicatedCmdBufBufNodeSyncPool = sync.Pool{ + New: func() interface{} { return new(replicatedCmdBufNode) }, +} + +// allocate extends the length of buf by one and returns the newly added +// element. If this is the first call to allocate it will initialize buf. +// After a buf is initialized it should be explicitly destroyed. +func (buf *replicatedCmdBuf) allocate() *replicatedCmd { + if buf.tail == nil { // lazy initialization + n := replicatedCmdBufBufNodeSyncPool.Get().(*replicatedCmdBufNode) + buf.head, buf.tail = n, n + } + if buf.tail.len == replicatedCmdBufNodeSize { + newTail := replicatedCmdBufBufNodeSyncPool.Get().(*replicatedCmdBufNode) + buf.tail.next = newTail + buf.tail = newTail + } + ret := &buf.tail.buf[buf.tail.len] + buf.tail.len++ + buf.len++ + return ret +} + +// truncate clears all of the entries currently in a buffer and returns any +// allocated buffers to the pool. +func (buf *replicatedCmdBuf) clear() { + for buf.head != nil { + buf.len -= buf.head.len + oldHead := buf.head + newHead := oldHead.next + buf.head = newHead + *oldHead = replicatedCmdBufNode{} + replicatedCmdBufBufNodeSyncPool.Put(oldHead) + } + *buf = replicatedCmdBuf{} +} + +// newIter returns a pointer to a new uninitialized iterator. The iterator +// should be closed when no longer in use. +func (buf *replicatedCmdBuf) newIter() *replicatedCmdBufSlice { + return buf.free.get() +} + +// replicatedCmdBufPtr is a pointer into a replicatedCmdBuf. +type replicatedCmdBufPtr struct { + idx int32 + buf *replicatedCmdBuf + node *replicatedCmdBufNode +} + +func (ptr *replicatedCmdBufPtr) valid() bool { + return ptr.idx < ptr.buf.len +} + +func (ptr *replicatedCmdBufPtr) cur() *replicatedCmd { + return &ptr.node.buf[ptr.idx%replicatedCmdBufNodeSize] +} + +func (ptr *replicatedCmdBufPtr) next() { + ptr.idx++ + if !ptr.valid() { + return + } + if ptr.idx%replicatedCmdBufNodeSize == 0 { + ptr.node = ptr.node.next + } +} + +// replicatedCmdBufSlice iterates through the entries in a replicatedCmdBufBuf. +type replicatedCmdBufSlice struct { + head, tail replicatedCmdBufPtr +} + +// init initializes the slice over the entries in replicatedCmdBuf. +func (it *replicatedCmdBufSlice) init(buf *replicatedCmdBuf) { + *it = replicatedCmdBufSlice{ + head: replicatedCmdBufPtr{idx: 0, buf: buf, node: buf.head}, + tail: replicatedCmdBufPtr{idx: buf.len, buf: buf, node: buf.tail}, + } +} + +// initEmpty initializes the slice with a length of 0 and pointing at +// the head of the replicatedCmdBuf. +func (it *replicatedCmdBufSlice) initEmpty(buf *replicatedCmdBuf) { + *it = replicatedCmdBufSlice{ + head: replicatedCmdBufPtr{idx: 0, buf: buf, node: buf.head}, + tail: replicatedCmdBufPtr{idx: 0, buf: buf, node: buf.head}, + } +} + +// len returns the length of the slice. +func (it *replicatedCmdBufSlice) len() int { + return int(it.tail.idx - it.head.idx) +} + +// Valid implements the apply.CommandIteratorBase interface. +func (it *replicatedCmdBufSlice) Valid() bool { + return it.len() > 0 +} + +// Next implements the apply.CommandIteratorBase interface. +func (it *replicatedCmdBufSlice) Next() { + it.head.next() +} + +// cur and its variants implement the apply.{Checked,Applied}CommandIterator interface. +func (it *replicatedCmdBufSlice) cur() *replicatedCmd { return it.head.cur() } +func (it *replicatedCmdBufSlice) Cur() apply.Command { return it.head.cur() } +func (it *replicatedCmdBufSlice) CurChecked() apply.CheckedCommand { return it.head.cur() } +func (it *replicatedCmdBufSlice) CurApplied() apply.AppliedCommand { return it.head.cur() } + +// append and its variants implement the apply.{Checked,Applied}CommandList interface. +func (it *replicatedCmdBufSlice) append(cmd *replicatedCmd) { + cur := it.tail.cur() + if cur == cmd { + // Avoid the copy. + } else { + *cur = *cmd + } + it.tail.next() +} +func (it *replicatedCmdBufSlice) Append(cmd apply.Command) { it.append(cmd.(*replicatedCmd)) } +func (it *replicatedCmdBufSlice) AppendChecked(cmd apply.CheckedCommand) { + it.append(cmd.(*replicatedCmd)) +} +func (it *replicatedCmdBufSlice) AppendApplied(cmd apply.AppliedCommand) { + it.append(cmd.(*replicatedCmd)) +} + +// newList and its variants implement the apply.{Checked}CommandIterator interface. +func (it *replicatedCmdBufSlice) newList() *replicatedCmdBufSlice { + it2 := it.head.buf.newIter() + it2.initEmpty(it.head.buf) + return it2 +} +func (it *replicatedCmdBufSlice) NewList() apply.CommandList { return it.newList() } +func (it *replicatedCmdBufSlice) NewCheckedList() apply.CheckedCommandList { return it.newList() } +func (it *replicatedCmdBufSlice) NewAppliedList() apply.AppliedCommandList { return it.newList() } + +// Close implements the apply.CommandIteratorBase interface. +func (it *replicatedCmdBufSlice) Close() { + it.head.buf.free.put(it) +} + +// replicatedCmdBufSliceFreeList is a free list of replicatedCmdBufSlice +// objects that is used to avoid memory allocations for short-lived +// replicatedCmdBufSlice objects that require heap allocation. +type replicatedCmdBufSliceFreeList struct { + iters [replicatedCmdBufSliceFreeListSize]replicatedCmdBufSlice + inUse [replicatedCmdBufSliceFreeListSize]bool +} + +func (f *replicatedCmdBufSliceFreeList) put(it *replicatedCmdBufSlice) { + *it = replicatedCmdBufSlice{} + for i := range f.iters { + if &f.iters[i] == it { + f.inUse[i] = false + return + } + } +} + +func (f *replicatedCmdBufSliceFreeList) get() *replicatedCmdBufSlice { + for i, inUse := range f.inUse { + if !inUse { + f.inUse[i] = true + return &f.iters[i] + } + } + panic("replicatedCmdBufSliceFreeList has no free elements. Is " + + "replicatedCmdBufSliceFreeListSize tuned properly? Are we " + + "leaking iterators by not calling Close?") +} diff --git a/pkg/storage/cmd_app_ctx_buf_test.go b/pkg/storage/replica_application_cmd_buf_test.go similarity index 76% rename from pkg/storage/cmd_app_ctx_buf_test.go rename to pkg/storage/replica_application_cmd_buf_test.go index b603a11df92e..3814aca54d1c 100644 --- a/pkg/storage/cmd_app_ctx_buf_test.go +++ b/pkg/storage/replica_application_cmd_buf_test.go @@ -17,21 +17,21 @@ import ( "github.com/stretchr/testify/assert" ) -// TestEntryApplicationStateBuf verifies the entryApplicationStateBuf behavior. -func TestApplicationStateBuf(t *testing.T) { +// TestReplicatedCmdBuf verifies the replicatedCmdBuf behavior. +func TestReplicatedCmdBuf(t *testing.T) { defer leaktest.AfterTest(t)() - var buf cmdAppCtxBuf + var buf replicatedCmdBuf // numStates is chosen arbitrarily. - const numStates = 5*cmdAppCtxBufNodeSize + 1 + const numStates = 5*replicatedCmdBufNodeSize + 1 // Test that the len field is properly updated. - var states []*cmdAppCtx + var states []*replicatedCmd for i := 0; i < numStates; i++ { assert.Equal(t, i, int(buf.len)) states = append(states, buf.allocate()) assert.Equal(t, i+1, int(buf.len)) } // Test the iterator. - var it cmdAppCtxBufSlice + var it replicatedCmdBufSlice i := 0 for it.init(&buf); it.Valid(); it.Next() { assert.Equal(t, states[i], it.cur()) @@ -40,11 +40,11 @@ func TestApplicationStateBuf(t *testing.T) { assert.Equal(t, i, numStates) // make sure we saw them all // Test clear. buf.clear() - assert.EqualValues(t, buf, cmdAppCtxBuf{}) + assert.EqualValues(t, buf, replicatedCmdBuf{}) assert.Equal(t, 0, int(buf.len)) it.init(&buf) assert.False(t, it.Valid()) // Test clear on an empty buffer. buf.clear() - assert.EqualValues(t, buf, cmdAppCtxBuf{}) + assert.EqualValues(t, buf, replicatedCmdBuf{}) } diff --git a/pkg/storage/replica_application_decoder.go b/pkg/storage/replica_application_decoder.go index 25541341dda1..92ad6358cae6 100644 --- a/pkg/storage/replica_application_decoder.go +++ b/pkg/storage/replica_application_decoder.go @@ -20,22 +20,23 @@ import ( // replica_application_*.go files provide concrete implementations of // the interfaces defined in the storage/apply package: // -// replica_application_decoder.go => apply.Decoder -// replica_application_applier.go => apply.StateMachine -// replica_application_cmd.go => apply.Command (and variants) -// replica_application_cmd_iter.go => apply.CommandIterator (and variants) +// replica_application_state_machine.go -> apply.StateMachine +// replica_application_decoder.go -> apply.Decoder +// replica_application_cmd.go -> apply.Command (and variants) +// replica_application_cmd_buf.go -> apply.CommandIterator (and variants) +// replica_application_cmd_buf.go -> apply.CommandList (and variants) // // These allow Replica to interface with the storage/apply package. // replicaDecoder implements the apply.Decoder interface. // // The object is capable of decoding committed raft entries into a list of -// cmdAppCtx objects (which implement all variants of apply.Command), binding +// replicatedCmd objects (which implement all variants of apply.Command), binding // these commands to their local proposals, and providing an iterator over these // commands. type replicaDecoder struct { r *Replica - cmdBuf cmdAppCtxBuf + cmdBuf replicatedCmdBuf } // getDecoder returns the Replica's apply.Decoder. The Replica's raftMu @@ -76,7 +77,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b // even if the applier has multiple entries for the same proposal, in which // case the proposal was reproposed (either under its original or a new // MaxLeaseIndex) which we handle in a second pass below. - var it cmdAppCtxBufSlice + var it replicatedCmdBufSlice for it.init(&d.cmdBuf); it.Valid(); it.Next() { cmd := it.cur() cmd.proposal = d.r.mu.proposals[cmd.idKey] diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index 32a61b7649be..49f97ec27b69 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -21,6 +21,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) +// replica_application_*.go files provide concrete implementations of +// the interfaces defined in the storage/apply package: +// +// replica_application_state_machine.go -> apply.StateMachine +// replica_application_decoder.go -> apply.Decoder +// replica_application_cmd.go -> apply.Command (and variants) +// replica_application_cmd_buf.go -> apply.CommandIterator (and variants) +// replica_application_cmd_buf.go -> apply.CommandList (and variants) +// +// These allow Replica to interface with the storage/apply package. + // isTrivial determines whether the side-effects of a ReplicatedEvalResult are // "trivial". A result is fundamentally considered "trivial" if it does not have // side effects which rely on the written state of the replica exactly matching @@ -100,7 +111,7 @@ func clearTrivialReplicatedEvalResultFields(r *storagepb.ReplicatedEvalResult) { // ReplicatedEvalResult because the process of handling the replicated eval // result will zero-out the struct to ensure that is has properly performed all // of the implied side-effects. -func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { +func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) { if !cmd.IsLocal() { return } @@ -179,7 +190,7 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *cmdAppCtx) { // has already been successfully applied or has been reproposed here or by a // different entry for the same proposal that hit an illegal lease index error. func (r *Replica) tryReproposeWithNewLeaseIndex( - ctx context.Context, cmd *cmdAppCtx, + ctx context.Context, cmd *replicatedCmd, ) *roachpb.Error { // Note that we don't need to validate anything about the proposal's // lease here - if we got this far, we know that everything but the @@ -212,10 +223,11 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( return nil } -// Replica.handleXYZResult methods are called by replicaApplier.ApplySideEffects -// when applying non-trivial side effects. As a general rule, there is a method -// for each of the non-trivial fields in ReplicatedEvalResult. Most methods are -// simple enough that they will be inlined. +// The following Replica.handleXYZResult methods are called when applying +// non-trivial side effects in replicaStateMachine.ApplySideEffects. As a +// general rule, there is a method for each of the non-trivial fields in +// ReplicatedEvalResult. Most methods are simple enough that they will be +// inlined. func (r *Replica) handleSplitResult(ctx context.Context, split *storagepb.Split) { splitPostApply(ctx, split.RHSDelta, &split.SplitTrigger, r) diff --git a/pkg/storage/replica_application_impl.go b/pkg/storage/replica_application_state_machine.go similarity index 92% rename from pkg/storage/replica_application_impl.go rename to pkg/storage/replica_application_state_machine.go index 55b735fbe673..79f846edc720 100644 --- a/pkg/storage/replica_application_impl.go +++ b/pkg/storage/replica_application_state_machine.go @@ -33,10 +33,11 @@ import ( // replica_application_*.go files provide concrete implementations of // the interfaces defined in the storage/apply package: // -// replica_application_decoder.go => apply.Decoder -// replica_application_applier.go => apply.StateMachine -// replica_application_cmd.go => apply.Command (and variants) -// replica_application_cmd_iter.go => apply.CommandIterator (and variants) +// replica_application_state_machine.go -> apply.StateMachine +// replica_application_decoder.go -> apply.Decoder +// replica_application_cmd.go -> apply.Command (and variants) +// replica_application_cmd_buf.go -> apply.CommandIterator (and variants) +// replica_application_cmd_buf.go -> apply.CommandList (and variants) // // These allow Replica to interface with the storage/apply package. @@ -57,7 +58,7 @@ type applyCommittedEntriesStats struct { // reason during the application phase of state machine replication. The only // acceptable recourse is to signal that the replica has become corrupted. // -// All errors returned by replicaDecoder and replicaApplier will be instances +// All errors returned by replicaDecoder and replicaStateMachine will be instances // of this type. type nonDeterministicFailure struct { wrapped error @@ -93,7 +94,7 @@ func (e *nonDeterministicFailure) Cause() error { return e.wrapped } // planned to be moved to the stdlib in go 1.13. func (e *nonDeterministicFailure) Unwrap() error { return e.wrapped } -// replicaApplier implements the apply.StateMachine interface. +// replicaStateMachine implements the apply.StateMachine interface. // // The structure coordinates state transitions within the Replica state machine // due to the application of replicated commands decoded from committed raft @@ -102,7 +103,7 @@ func (e *nonDeterministicFailure) Unwrap() error { return e.wrapped } // current view of ReplicaState and staged in a replicaAppBatch, the batch is // committed to the Replica's storage engine atomically, and finally the // side-effects of each command is applied to the Replica's in-memory state. -type replicaApplier struct { +type replicaStateMachine struct { r *Replica // batch is returned from NewBatch(). batch replicaAppBatch @@ -110,12 +111,12 @@ type replicaApplier struct { stats applyCommittedEntriesStats } -// getApplier returns the Replica's apply.StateMachine. The Replica's -// raftMu is held for the entire lifetime of the replicaApplier. -func (r *Replica) getApplier() *replicaApplier { - a := &r.raftMu.applier - a.r = r - return a +// getStateMachine returns the Replica's apply.StateMachine. The Replica's +// raftMu is held for the entire lifetime of the replicaStateMachine. +func (r *Replica) getStateMachine() *replicaStateMachine { + sm := &r.raftMu.stateMachine + sm.r = r + return sm } // shouldApplyCommand determines whether or not a command should be applied to @@ -123,7 +124,7 @@ func (r *Replica) getApplier() *replicaApplier { // then sets the provided command's leaseIndex, proposalRetry, and forcedErr // fields and returns whether command should be applied or rejected. func (r *Replica) shouldApplyCommand( - ctx context.Context, cmd *cmdAppCtx, replicaState *storagepb.ReplicaState, + ctx context.Context, cmd *replicatedCmd, replicaState *storagepb.ReplicaState, ) bool { cmd.leaseIndex, cmd.proposalRetry, cmd.forcedErr = checkForcedErr( ctx, cmd.idKey, &cmd.raftCmd, cmd.IsLocal(), replicaState, @@ -320,11 +321,11 @@ func checkForcedErr( } // NewBatch implements the apply.StateMachine interface. -func (a *replicaApplier) NewBatch() apply.Batch { - r := a.r - b := &a.batch +func (sm *replicaStateMachine) NewBatch() apply.Batch { + r := sm.r + b := &sm.batch b.r = r - b.a = a + b.sm = sm b.batch = r.store.engine.NewBatch() r.mu.RLock() b.state = r.mu.state @@ -343,8 +344,8 @@ func (a *replicaApplier) NewBatch() apply.Batch { // to the current view of ReplicaState and staged in the batch. The batch is // committed to the state machine's storage engine atomically. type replicaAppBatch struct { - r *Replica - a *replicaApplier + r *Replica + sm *replicaStateMachine // batch accumulates writes implied by the raft entries in this batch. batch engine.Batch @@ -395,7 +396,7 @@ type replicaAppBatch struct { // whether to accept or reject the next command that is staged without needing // to actually update the replica state machine in between. func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { - cmd := cmdI.(*cmdAppCtx) + cmd := cmdI.(*replicatedCmd) ctx := cmd.ctx if cmd.ent.Index == 0 { return nil, makeNonDetermFailure("processRaftCommand requires a non-zero index") @@ -468,7 +469,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error // migrateReplicatedResult performs any migrations necessary on the command to // normalize it before applying it to the batch. This may modify the command. -func (b *replicaAppBatch) migrateReplicatedResult(ctx context.Context, cmd *cmdAppCtx) { +func (b *replicaAppBatch) migrateReplicatedResult(ctx context.Context, cmd *replicatedCmd) { // If the command was using the deprecated version of the MVCCStats proto, // migrate it to the new version and clear out the field. res := cmd.replicatedResult() @@ -483,7 +484,7 @@ func (b *replicaAppBatch) migrateReplicatedResult(ctx context.Context, cmd *cmdA // stageWriteBatch applies the command's write batch to the application batch's // RocksDB batch. This batch is committed to RocksDB in replicaAppBatch.commit. -func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *cmdAppCtx) error { +func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *replicatedCmd) error { wb := cmd.raftCmd.WriteBatch if wb == nil { return nil @@ -501,7 +502,7 @@ func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *cmdAppCtx) e // runPreApplyTriggers runs any triggers that must fire before a command is // applied. It may modify the command's ReplicatedEvalResult. -func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *cmdAppCtx) error { +func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicatedCmd) error { res := cmd.replicatedResult() // AddSSTable ingestions run before the actual batch gets written to the @@ -601,7 +602,9 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *cmdAppCt // modifies the receiver's ReplicaState but does not modify ReplicatedEvalResult // in order to give the TestingPostApplyFilter testing knob an opportunity to // inspect the command's ReplicatedEvalResult. -func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(ctx context.Context, cmd *cmdAppCtx) { +func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( + ctx context.Context, cmd *replicatedCmd, +) { if raftAppliedIndex := cmd.ent.Index; raftAppliedIndex != 0 { b.state.RaftAppliedIndex = raftAppliedIndex } @@ -753,9 +756,9 @@ func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { } func (b *replicaAppBatch) recordStatsOnCommit() { - b.a.stats.entriesProcessed += b.entries - b.a.stats.numEmptyEntries += b.emptyEntries - b.a.stats.batchesProcessed++ + b.sm.stats.entriesProcessed += b.entries + b.sm.stats.numEmptyEntries += b.emptyEntries + b.sm.stats.batchesProcessed++ elapsed := timeutil.Since(b.start) b.r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) @@ -777,8 +780,10 @@ func (b *replicaAppBatch) Close() { // the Replica's in-memory state. This method deals with applying non-trivial // side effects of commands, such as finalizing splits/merges and informing // raft about applied config changes. -func (a *replicaApplier) ApplySideEffects(cmdI apply.CheckedCommand) (apply.AppliedCommand, error) { - cmd := cmdI.(*cmdAppCtx) +func (sm *replicaStateMachine) ApplySideEffects( + cmdI apply.CheckedCommand, +) (apply.AppliedCommand, error) { + cmd := cmdI.(*replicatedCmd) ctx := cmd.ctx // Deal with locking during side-effect handling, which is sometimes @@ -788,13 +793,13 @@ func (a *replicaApplier) ApplySideEffects(cmdI apply.CheckedCommand) (apply.Appl } if cmd.replicatedResult().BlockReads { cmd.replicatedResult().BlockReads = false - a.r.readOnlyCmdMu.Lock() - defer a.r.readOnlyCmdMu.Unlock() + sm.r.readOnlyCmdMu.Lock() + defer sm.r.readOnlyCmdMu.Unlock() } // Set up the local result prior to handling the ReplicatedEvalResult to // give testing knobs an opportunity to inspect it. - a.r.prepareLocalResult(ctx, cmd) + sm.r.prepareLocalResult(ctx, cmd) if log.ExpensiveLogEnabled(ctx, 2) { log.VEvent(ctx, 2, cmd.localResult.String()) } @@ -806,29 +811,29 @@ func (a *replicaApplier) ApplySideEffects(cmdI apply.CheckedCommand) (apply.Appl // before notifying a potentially waiting client. clearTrivialReplicatedEvalResultFields(cmd.replicatedResult()) if !cmd.IsTrivial() { - shouldAssert := a.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) + shouldAssert := sm.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) // NB: Perform state assertion before acknowledging the client. // Some tests (TestRangeStatsInit) assumes that once the store has started // and the first range has a lease that there will not be a later hard-state. if shouldAssert { // Assert that the on-disk state doesn't diverge from the in-memory // state as a result of the side effects. - a.r.mu.Lock() - a.r.assertStateLocked(ctx, a.r.store.Engine()) - a.r.mu.Unlock() - a.stats.stateAssertions++ + sm.r.mu.Lock() + sm.r.assertStateLocked(ctx, sm.r.store.Engine()) + sm.r.mu.Unlock() + sm.stats.stateAssertions++ } } else if res := cmd.replicatedResult(); !res.Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", res) } if cmd.replicatedResult().RaftLogDelta == 0 { - a.r.handleNoRaftLogDeltaResult(ctx) + sm.r.handleNoRaftLogDeltaResult(ctx) } if cmd.localResult != nil { - a.r.handleLocalEvalResult(ctx, *cmd.localResult) + sm.r.handleLocalEvalResult(ctx, *cmd.localResult) } - if err := a.maybeApplyConfChange(ctx, cmd); err != nil { + if err := sm.maybeApplyConfChange(ctx, cmd); err != nil { return nil, wrapWithNonDetermFailure(err, "unable to apply conf change") } @@ -857,7 +862,7 @@ func (a *replicaApplier) ApplySideEffects(cmdI apply.CheckedCommand) (apply.Appl // handleNonTrivialReplicatedEvalResult carries out the side-effects of // non-trivial commands. It is run with the raftMu locked. It is illegal // to pass a replicatedResult that does not imply any side-effects. -func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( +func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( ctx context.Context, rResult storagepb.ReplicatedEvalResult, ) (shouldAssert bool) { // Assert that this replicatedResult implies at least one side-effect. @@ -867,7 +872,7 @@ func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( if rResult.State != nil { if rResult.State.TruncatedState != nil { - rResult.RaftLogDelta += a.r.handleTruncatedStateResult(ctx, rResult.State.TruncatedState) + rResult.RaftLogDelta += sm.r.handleTruncatedStateResult(ctx, rResult.State.TruncatedState) rResult.State.TruncatedState = nil } @@ -877,12 +882,12 @@ func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( } if rResult.RaftLogDelta != 0 { - a.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta) + sm.r.handleRaftLogDeltaResult(ctx, rResult.RaftLogDelta) rResult.RaftLogDelta = 0 } if rResult.SuggestedCompactions != nil { - a.r.handleSuggestedCompactionsResult(ctx, rResult.SuggestedCompactions) + sm.r.handleSuggestedCompactionsResult(ctx, rResult.SuggestedCompactions) rResult.SuggestedCompactions = nil } @@ -895,33 +900,33 @@ func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( } if rResult.Split != nil { - a.r.handleSplitResult(ctx, rResult.Split) + sm.r.handleSplitResult(ctx, rResult.Split) rResult.Split = nil } if rResult.Merge != nil { - a.r.handleMergeResult(ctx, rResult.Merge) + sm.r.handleMergeResult(ctx, rResult.Merge) rResult.Merge = nil } if rResult.State != nil { if newDesc := rResult.State.Desc; newDesc != nil { - a.r.handleDescResult(ctx, newDesc) + sm.r.handleDescResult(ctx, newDesc) rResult.State.Desc = nil } if newLease := rResult.State.Lease; newLease != nil { - a.r.handleLeaseResult(ctx, newLease) + sm.r.handleLeaseResult(ctx, newLease) rResult.State.Lease = nil } if newThresh := rResult.State.GCThreshold; newThresh != nil { - a.r.handleGCThresholdResult(ctx, newThresh) + sm.r.handleGCThresholdResult(ctx, newThresh) rResult.State.GCThreshold = nil } if rResult.State.UsingAppliedStateKey { - a.r.handleUsingAppliedStateKeyResult(ctx) + sm.r.handleUsingAppliedStateKeyResult(ctx) rResult.State.UsingAppliedStateKey = false } @@ -931,12 +936,12 @@ func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( } if rResult.ChangeReplicas != nil { - a.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) + sm.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) rResult.ChangeReplicas = nil } if rResult.ComputeChecksum != nil { - a.r.handleComputeChecksumResult(ctx, rResult.ComputeChecksum) + sm.r.handleComputeChecksumResult(ctx, rResult.ComputeChecksum) rResult.ComputeChecksum = nil } @@ -946,7 +951,7 @@ func (a *replicaApplier) handleNonTrivialReplicatedEvalResult( return true } -func (a *replicaApplier) maybeApplyConfChange(ctx context.Context, cmd *cmdAppCtx) error { +func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *replicatedCmd) error { switch cmd.ent.Type { case raftpb.EntryNormal: if cmd.replicatedResult().ChangeReplicas != nil { @@ -958,7 +963,7 @@ func (a *replicaApplier) maybeApplyConfChange(ctx context.Context, cmd *cmdAppCt // The command was rejected. cmd.cc = raftpb.ConfChange{} } - return a.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + return sm.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ApplyConfChange(cmd.cc) return true, nil }) @@ -967,8 +972,8 @@ func (a *replicaApplier) maybeApplyConfChange(ctx context.Context, cmd *cmdAppCt } } -func (a *replicaApplier) moveStats() applyCommittedEntriesStats { - stats := a.stats - a.stats = applyCommittedEntriesStats{} +func (sm *replicaStateMachine) moveStats() applyCommittedEntriesStats { + stats := sm.stats + sm.stats = applyCommittedEntriesStats{} return stats } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index de3b54531d78..7b28cbd36fd1 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -718,9 +718,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - app := r.getApplier() + sm := r.getStateMachine() dec := r.getDecoder() - appTask := apply.MakeTask(app, dec) + appTask := apply.MakeTask(sm, dec) appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) defer appTask.Close() if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { @@ -729,7 +729,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( if err := appTask.ApplyCommittedEntries(ctx); err != nil { return stats, err.(*nonDeterministicFailure).safeExpl, err } - stats.applyCommittedEntriesStats = app.moveStats() + stats.applyCommittedEntriesStats = sm.moveStats() // etcd raft occasionally adds a nil entry (our own commands are never // empty). This happens in two situations: When a new leader is elected, and