Skip to content

Commit

Permalink
storage/apply: create apply package for raft entry application
Browse files Browse the repository at this point in the history
The new package provides abstractions and routines associated with the
application of committed raft entries to a replicated state machine.

This was inspired by four driving forces:
- We've been having a number of discussions on the Core team about
  making storage abstractions more clear and easier to understand
  in isolation. One commonly discussed proposal is introducing a
  `storage/replicate` package that would encapsulate the concerns of
  raft replication (e.g. log manipulation, snapshots, leader election,
  heartbeats, etc.). This `storage/apply` package will fit in nicely
  alongside a replication abstraction.
- Initial discussion on cockroachdb#38954 concluded that adding an optimization
  to acknowledge clients after their raft entries have committed but
  before they had been applied with the current code structure was
  moving in the opposite direction and making things even harder to
  understand due to the introduction of more complex state management.
- Recent instability in this area (cockroachdb#38976, cockroachdb#39064, cockroachdb#39135, cockroachdb#39203) has
  revealed that there exists a high degree of difficulty involved in testing
  any of the logic in the area of raft entry application. This has naturally
  led to testing at a distance using tools like testing hooks, which is
  frustrating and delicate. As a result, we're missing tests for thing
  like old migrations that we still need to support. We also have trouble
  writing regression tests when bugs do pop up.
- The proposed optimization in cockroachdb#17500 (comment)
  to apply committed raft entries to the Replica storage engine asynchronously
  in a separate thread than the raft processing thread will make entry
  application significantly more complex. For instance, we'll likely
  need to introduce a separate scheduler to coordinate entry application
  passes across Ranges on a node. The schedule will likely want to
  prioritize leaders over followers and embed other policies to optimize
  for total system throughput. There's a strong desire to isolate this new
  complexity and to give the logic a place to live.

The PR begins to address these concerns by formalizing the process of
applying committed raft entries. To start, this makes the process easier
to understand both in terms of the macro-level steps that are taken during
application of a batch of entries and in terms of the impact that an individual
command has on the replicated state machine. For instance, the PR helps provide
answers to all of the following questions:

- What are the stages of raft entry application?
- What is the difference between a "raft entry" and a "replicated command"?
- What can a command do besides apply its write batch to the storage engine?
- What does it mean for a successfully replicated command to be rejected during application?
- When can we acknowledge the outcome of a raft proposal?

The refactor also uncovers a large testing surface that future PRs will exploit
to write targeted unit tests. Not only can the `storage/apply` package be tested
with a mock state machine (done in this PR), but we can test Replica's
implementation of the state machine interface in isolation without needing
to touch raft at all.

Finally, the refactor paves the way for making the proposed change in cockroachdb#38954
in a much cleaner way. This is demonstrated in next commit, which is being
included here to show why certain things were designed the way they were
but will not be merged with this PR.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 2, 2019
1 parent 175c5ad commit fc95a5a
Show file tree
Hide file tree
Showing 15 changed files with 2,036 additions and 1,387 deletions.
204 changes: 204 additions & 0 deletions pkg/storage/apply/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package apply

// Command is a command that has been successfully replicated through raft
// by being durably committed to the raft log of a quorum of peers in a raft
// group.
type Command interface {
// Index is the log index of the corresponding raft entry.
Index() uint64
// IsTrivial returns whether the command can apply in a batch.
IsTrivial() bool
// IsLocal returns whether the command was locally proposed.
IsLocal() bool
}

// CheckedCommand is a command that has been checked to see whether it can
// apply successfully or not. Committing an entry in a raft log and having
// the command in that entry succeed are similar but not equivalent concepts.
// A successfully committed entry may contain a command that the replicated
// state machine decides to reject (deterministically).
type CheckedCommand interface {
Command
// Rejected returns whether the command was rejected.
Rejected() bool
}

// AppliedCommand is a command that has been applied to the replicated state
// machine. A command is considered "applied" if it has been staged in a
// Batch which has been committed and had its side-effects run on the state
// machine. If the command was rejected (see CheckedCommand), applying the
// command will likely be a no-op, but that is up to the implementation of
// the state machine.
type AppliedCommand interface {
CheckedCommand
// AckOutcomeAndFinish acknowledges the outcome of the command to its
// client. It also signals that the application of the command has
// completed.
AckOutcomeAndFinish() error
}

// CommandIteratorBase is a common interface extended by all iterator and
// list variants. It is exported so its methods are displayed in godoc when
// it is embedded in other interfaces.
type CommandIteratorBase interface {
// Valid returns whether the iterator is pointing at a valid element.
Valid() bool
// Next advances the iterator. Should not be called if valid is false.
Next()
// NewList returns a new empty command list. Usages of the list will
// always advance the iterator before pushing into to the list, so
// implementors are free to share backing memory between the two.
NewList() CommandList
// NewCheckedList returns a new empty checked command list. Usages
// of the list will always advance the iterator before pushing into
// to the list, so implementors are free to share backing memory
// between the two.
NewCheckedList() CheckedCommandList
// NewAppliedList returns a new empty applied command list. Usages
// of the list will always advance the iterator before pushing into
// to the list, so implementors are free to share backing memory
// between the two.
NewAppliedList() AppliedCommandList
// Close closes the iterator. Once closed, it must not be used.
Close()
}

// CommandIterator is an iterator over replicated commands.
type CommandIterator interface {
CommandIteratorBase
// cur returns the command that the iterator is currently pointing at.
// Should not be called if valid is false.
Cur() Command
}

// CommandList is a list of replicated commands.
type CommandList interface {
CommandIterator
// Append pushes the command on to the back of the list.
Append(Command)
}

// CheckedCommandIterator is an iterator over checked replicated
// commands.
type CheckedCommandIterator interface {
CommandIteratorBase
// cur returns the checked command that the iterator is currently
// pointing at. Should not be called if valid is false.
CurChecked() CheckedCommand
}

// CheckedCommandList is a list of checked replicated commands.
type CheckedCommandList interface {
CheckedCommandIterator
// AppendChecked pushes the checked command on to the back of the list.
AppendChecked(CheckedCommand)
}

// AppliedCommandIterator is an iterator over applied replicated commands.
type AppliedCommandIterator interface {
CommandIteratorBase
// cur returns the applied command that the iterator is currently
// pointing at. Should not be called if valid is false.
CurApplied() AppliedCommand
}

// AppliedCommandList is a list of applied replicated commands.
type AppliedCommandList interface {
AppliedCommandIterator
// AppendApplied pushes the applied command on to the back of the list.
AppendApplied(AppliedCommand)
}

// takeWhileCmdIter returns an iterator that yields commands based on a
// predicate. It will call the predicate on each command in the provided
// iterator and yield elements while it returns true. The function does
// NOT close the provided iterator, but does drain it of any commands
// that are moved to the returned iterator.
func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIterator {
ret := iter.NewList()
for iter.Valid() {
cmd := iter.Cur()
if !pred(cmd) {
break
}
iter.Next()
ret.Append(cmd)
}
return ret
}

// mapCmdIter returns an iterator that contains the result of each command
// from the provided iterator transformed by a closure. The closure is
// responsible for converting Commands into CheckedCommand. The function
// closes the provided iterator.
func mapCmdIter(
iter CommandIterator, fn func(Command) (CheckedCommand, error),
) (CheckedCommandIterator, error) {
defer iter.Close()
ret := iter.NewCheckedList()
for iter.Valid() {
checked, err := fn(iter.Cur())
if err != nil {
return nil, err
}
iter.Next()
ret.AppendChecked(checked)
}
return ret, nil
}

// mapCheckedCmdIter returns an iterator that contains the result of each
// command from the provided iterator transformed by a closure. The closure
// is responsible for converting CheckedCommand into AppliedCommand. The
// function closes the provided iterator.
func mapCheckedCmdIter(
iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error),
) (AppliedCommandIterator, error) {
defer iter.Close()
ret := iter.NewAppliedList()
for iter.Valid() {
applied, err := fn(iter.CurChecked())
if err != nil {
return nil, err
}
iter.Next()
ret.AppendApplied(applied)
}
return ret, nil
}

// forEachCheckedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error {
defer iter.Close()
for iter.Valid() {
if err := fn(iter.CurChecked()); err != nil {
return err
}
iter.Next()
}
return nil
}

// forEachAppliedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachAppliedCmdIter(iter AppliedCommandIterator, fn func(AppliedCommand) error) error {
defer iter.Close()
for iter.Valid() {
if err := fn(iter.CurApplied()); err != nil {
return err
}
iter.Next()
}
return nil
}
Loading

0 comments on commit fc95a5a

Please sign in to comment.