Skip to content

Commit

Permalink
Merge #39254
Browse files Browse the repository at this point in the history
39254: storage/apply: create apply package for raft entry application r=nvanbenschoten a=nvanbenschoten

The new package provides abstractions and routines associated with the application of committed raft entries to a replicated state machine.

This was inspired by four driving forces:
- We've been having a number of discussions on the Core team about making storage abstractions more clear and easier to understand in isolation. One commonly discussed proposal is introducing a `storage/replicate` package that would encapsulate the concerns of raft replication (e.g. log manipulation, snapshots, leader election, heartbeats, etc.). This `storage/apply` package will fit in nicely alongside a replication abstraction.
- Initial discussion on #38954 concluded that adding an optimization to acknowledge clients after their raft entries have committed but before they had been applied with the current code structure was moving in the opposite direction and making things even harder to understand due to the introduction of more complex state management.
- Recent instability in this area (#38976, #39064, #39135, #39203) has revealed that there exists a high degree of difficulty involved in testing any of the logic in the area of raft entry application. This has naturally led to testing at a distance using tools like testing hooks, which is frustrating and delicate. As a result, we're missing tests for things like old migrations that we still need to support. We also have trouble writing regression tests when bugs do pop up.
- The proposed optimization in #17500 (comment) to apply committed raft entries to the Replica storage engine asynchronously in a separate thread than the raft processing thread will make entry application significantly more complex. For instance, we'll likely need to introduce a separate scheduler to coordinate entry application passes across Ranges on a node. The schedule will likely want to prioritize leaders over followers and embed other policies to optimize for total system throughput. There's a strong desire to isolate this new complexity and to give the logic a place to live.

The PR begins to address these concerns by formalizing the process of applying committed raft entries. To start, this makes the process easier to understand both in terms of the macro-level steps that are taken during application of a batch of entries and in terms of the impact that an individual command has on the replicated state machine. For instance, the PR helps provide answers to all of the following questions:

- What are the stages of raft entry application?
- What is the difference between a "raft entry" and a "replicated command"?
- What can a command do besides apply its write batch to the storage engine?
- What does it mean for a successfully replicated command to be rejected during application?
- When can we acknowledge the outcome of a raft proposal?

The refactor also uncovers a large testing surface that future PRs will exploit to write targeted unit tests. Not only can the `storage/apply` package be tested with a mock state machine (done in this PR), but we can test Replica's implementation of the state machine interface in isolation without needing to touch raft at all.

Finally, the refactor paves the way for making the proposed change in #38954 in a much cleaner way. This is demonstrated in the second commit, which is being included here to show why certain things were designed the way they were but will not be merged with this PR.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 7, 2019
2 parents e6f7de3 + e2eb877 commit 492d8dd
Show file tree
Hide file tree
Showing 20 changed files with 2,565 additions and 1,591 deletions.
195 changes: 195 additions & 0 deletions pkg/storage/apply/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package apply

// Command is a command that has been successfully replicated through raft
// by being durably committed to the raft log of a quorum of peers in a raft
// group.
type Command interface {
// Index is the log index of the corresponding raft entry.
Index() uint64
// IsTrivial returns whether the command can apply in a batch.
IsTrivial() bool
// IsLocal returns whether the command was locally proposed. Command
// that were locally proposed typically have a client waiting on a
// response, so there is additional urgency to apply them quickly.
IsLocal() bool
}

// CheckedCommand is a command that has been checked to see whether it can
// apply successfully or not. Committing an entry in a raft log and having
// the command in that entry succeed are similar but not equivalent concepts.
// A successfully committed entry may contain a command that the replicated
// state machine decides to reject (deterministically).
type CheckedCommand interface {
Command
// Rejected returns whether the command was rejected.
Rejected() bool
}

// AppliedCommand is a command that has been applied to the replicated state
// machine. A command is considered "applied" if it has been staged in a
// Batch which has been committed and had its side-effects run on the state
// machine. If the command was rejected (see CheckedCommand), applying the
// command will likely be a no-op, but that is up to the implementation of
// the state machine.
type AppliedCommand interface {
CheckedCommand
// FinishAndAckOutcome signals that the application of the command has
// completed. It also acknowledges the outcome of the command to its
// client if it was proposed locally. An error will immediately stall
// entry application, so one must only be returned if the state machine
// is no longer able to make progress.
FinishAndAckOutcome() error
}

// CommandIteratorBase is a common interface extended by all iterator and
// list variants. It is exported so its methods are displayed in godoc when
// it is embedded in other interfaces.
type CommandIteratorBase interface {
// Valid returns whether the iterator is pointing at a valid element.
Valid() bool
// Next advances the iterator. Must not be called if valid is false.
Next()
// Close closes the iterator. Once closed, it must not be used.
Close()
}

// CommandIterator is an iterator over replicated commands.
type CommandIterator interface {
CommandIteratorBase
// Cur returns the command that the iterator is currently pointing at.
// Must not be called if valid is false.
Cur() Command
// NewList returns a new empty command list. Usages of the list will
// always advance the iterator before pushing in to the list, so
// implementors are free to share backing memory between the two.
NewList() CommandList
// NewCheckedList returns a new empty checked command list. Usages
// of the list will always advance the iterator before pushing into
// to the list, so implementors are free to share backing memory
// between the two.
NewCheckedList() CheckedCommandList
}

// CommandList is a list of replicated commands.
type CommandList interface {
CommandIterator
// Append adds the command to the end of the list.
Append(Command)
}

// CheckedCommandIterator is an iterator over checked replicated
// commands.
type CheckedCommandIterator interface {
CommandIteratorBase
// CurChecked returns the checked command that the iterator is
// currently pointing at. Must not be called if valid is false.
CurChecked() CheckedCommand
// NewAppliedList returns a new empty applied command list. Usages
// of the list will always advance the iterator before pushing into
// to the list, so implementors are free to share backing memory
// between the two.
NewAppliedList() AppliedCommandList
}

// CheckedCommandList is a list of checked replicated commands.
type CheckedCommandList interface {
CheckedCommandIterator
// AppendChecked adds the checked command to the end of the list.
AppendChecked(CheckedCommand)
}

// AppliedCommandIterator is an iterator over applied replicated commands.
type AppliedCommandIterator interface {
CommandIteratorBase
// CurApplied returns the applied command that the iterator is
// currently pointing at. Must not be called if valid is false.
CurApplied() AppliedCommand
}

// AppliedCommandList is a list of applied replicated commands.
type AppliedCommandList interface {
AppliedCommandIterator
// AppendApplied adds the applied command to the end of the list.
AppendApplied(AppliedCommand)
}

// takeWhileCmdIter returns an iterator that yields commands based on a
// predicate. It will call the predicate on each command in the provided
// iterator and yield elements while it returns true. The function does
// NOT close the provided iterator, but does drain it of any commands
// that are moved to the returned iterator.
func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIterator {
ret := iter.NewList()
for iter.Valid() {
cmd := iter.Cur()
if !pred(cmd) {
break
}
iter.Next()
ret.Append(cmd)
}
return ret
}

// mapCmdIter returns an iterator that contains the result of each command
// from the provided iterator transformed by a closure. The closure is
// responsible for converting Commands into CheckedCommand. The function
// closes the provided iterator.
func mapCmdIter(
iter CommandIterator, fn func(Command) (CheckedCommand, error),
) (CheckedCommandIterator, error) {
defer iter.Close()
ret := iter.NewCheckedList()
for iter.Valid() {
checked, err := fn(iter.Cur())
if err != nil {
return nil, err
}
iter.Next()
ret.AppendChecked(checked)
}
return ret, nil
}

// mapCheckedCmdIter returns an iterator that contains the result of each
// command from the provided iterator transformed by a closure. The closure
// is responsible for converting CheckedCommand into AppliedCommand. The
// function closes the provided iterator.
func mapCheckedCmdIter(
iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error),
) (AppliedCommandIterator, error) {
defer iter.Close()
ret := iter.NewAppliedList()
for iter.Valid() {
applied, err := fn(iter.CurChecked())
if err != nil {
return nil, err
}
iter.Next()
ret.AppendApplied(applied)
}
return ret, nil
}

// forEachAppliedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachAppliedCmdIter(iter AppliedCommandIterator, fn func(AppliedCommand) error) error {
defer iter.Close()
for iter.Valid() {
if err := fn(iter.CurApplied()); err != nil {
return err
}
iter.Next()
}
return nil
}
123 changes: 123 additions & 0 deletions pkg/storage/apply/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

/*
Package apply provides abstractions and routines associated with the application
of committed raft entries to a replicated state machine.
State Machine Replication
Raft entry application is the process of taking entries that have been committed
to a raft group's "raft log" through raft consensus and using them to drive the
state machines of each member of the raft group (i.e. each replica). Committed
entries are decoded into commands in the same order that they are arranged in
the raft log (i.e. in order of increasing log index). This ordering of decoded
commands is then treated as the input to state transitions on each replica.
The key to this general approach, known as "state machine replication", is that
all state transitions are fully deterministic given only the current state of
the machine and the command to apply as input. This ensures that if each
instance is driven from the same consistent shared log (same entries, same
order), they will all stay in sync. In other words, if we ensure that all
replicas start as identical copies of each other and we ensure that all replicas
perform the same state transitions, in the same order, deterministically, then
through induction we know that all replicas will remain identical copies of each
other when compared at the same log index.
This poses a problem for replicas that fail for any reason to apply an entry. If
the failure wasn't deterministic across all replicas then they can't carry on
applying entries, as their state may have diverged from their peers. The only
reasonable recourse is to signal that the replica has become corrupted. This
demonstrates why it is necessary to separate deterministic command failures from
non-deterministic state transition failures. The former, which we call "command
rejection" is permissible as long as all replicas come to the same decision to
reject the command and handle the rejection in the same way (e.g. decide not to
make any state transition). The latter, on the other hand, it not permissible,
and is typically handled by crashing the node.
Performance Concerns
The state machine replication approach also poses complications that affect
performance.
A first challenge falls out from the requirement that all replicated commands be
sequentially applied on each replica to enforce determinism. This requirement
must hold even as the concurrency of the systems processing requests and driving
replication grows. If this concurrency imbalance becomes so great that the
sequential processing of updates to the replicated state machine can no longer
keep up with the concurrent processing feeding inputs into the replicated state
machine, replication itself becomes a throughput bottleneck for the system,
manifesting as replication lag. This problem, sometimes referred to as the
"parallelism gap", is fundamentally due to the loss of context on the
interaction between commands after replication and a resulting inability to
determine whether concurrent application of commands would be possible without
compromising determinism. Put another way, above the level of state machine
replication, it is easy to determine which commands conflict with one another,
and those that do not conflict can be run concurrently. However, below the level
of replication, it is unclear which commands conflict, so to ensure determinism
during state machine transitions, no concurrency is possible.
Although it makes no attempt to explicitly introduce concurrency into command
application, this package does attempt to improve replication throughput and
reduce this parallelism gap through the use of batching. A notion of command
triviality is exposed to clients of this package, and those commands that are
trivial are considered able to have their application batched with other
adjacent trivial commands. This batching, while still preserving a strict
ordering of commands, allows multiple commands to achieve some concurrency in
their interaction with the state machine. For instance, writes to a storage
engine from different commands are able to be batched together using this
interface. For more, see Batch.
A second challenge arising from the technique of state machine replication is
its interaction with client responses and acknowledgment. We saw before that a
command is guaranteed to eventually apply if its corresponding raft entry is
committed in the raft log - individual replicas have no other choice but to
apply it. However, depending on the replicated state, the fact that a command
will apply may not be sufficient to return a response to a client. In some
cases, the command may still be rejected (deterministically) and the client
should be alerted of that. In more extreme cases, the result of the command may
not even be known until it is applied to the state machine. In CockroachDB, this
was the case until the major rework that took place in 2016 called "proposer
evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the
completion of that change, client responses are determined before replication
begins. The only remaining work to be done after replication of a command
succeeds is to determine whether it will be rejected and replaced by an empty
command.
A final challenge comes from the desire to properly prioritize the application
of commands across multiple state machines in systems like CockroachDB where
each machine hosts hundreds or thousands of replicas. This is a complicated
concern that must take into consideration the need for each replica's state
machine to stay up-to-date (is it a leaseholder? is it serving reads?), the need
to acknowledge clients in a timely manner (are clients waiting for command
application?), the desire to delay application to accumulate larger application
batches (will batching improve system throughput?), and a number of other
factors. This package has not begun to answer these questions, but it serves to
provide the abstractions necessary to perform such prioritization in the future.
Usage
The package exports a set of interfaces that users must provide implementations
for. Notably, users of the package must provide a StateMachine that encapsulates
the logic behind performing individual state transitions and a Decoder that is
capable of decoding raft entries and providing iteration over corresponding
Command objects.
These two structures can be used to create an application Task, which is capable
of applying raft entries to the StateMachine (see Task.ApplyCommittedEntries).
To do so, the Commands that were decoded using the Decoder (see Task.Decode) are
passed through a pipeline of stages. First, the Commands are checked for
rejection while being staged in an application Batch, which produces a set of
CheckedCommands. Next, the application Batch is committed to the StateMachine.
Following this, the in-memory side-effects of the CheckedCommands are applied to
the StateMachine, producing AppliedCommands. Finally, these AppliedCommands are
finalized and their clients are acknowledged.
*/
package apply
74 changes: 74 additions & 0 deletions pkg/storage/apply/doc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package apply_test

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/storage/apply"
)

func ExampleTask() {
defer setLogging(true)()
ctx := context.Background()
ents := makeEntries(7)

sm := getTestStateMachine()
dec := newTestDecoder()
dec.nonTrivial[5] = true
dec.nonLocal[2] = true
dec.nonLocal[6] = true
dec.shouldReject[3] = true
dec.shouldReject[6] = true

t := apply.MakeTask(sm, dec)
defer t.Close()

fmt.Println("Decode:")
if err := t.Decode(ctx, ents); err != nil {
panic(err)
}

fmt.Println("\nApplyCommittedEntries:")
if err := t.ApplyCommittedEntries(ctx); err != nil {
panic(err)
}
// Output:
//
// Decode:
// decoding command 1; local=true
// decoding command 2; local=false
// decoding command 3; local=true
// decoding command 4; local=true
// decoding command 5; local=true
// decoding command 6; local=false
// decoding command 7; local=true
//
// ApplyCommittedEntries:
// committing batch with commands=[1 2 3 4]
// applying side-effects of command 1
// applying side-effects of command 2
// applying side-effects of command 3
// applying side-effects of command 4
// finishing and acknowledging command 1; rejected=false
// finishing and acknowledging command 2; rejected=false
// finishing and acknowledging command 3; rejected=true
// finishing and acknowledging command 4; rejected=false
// committing batch with commands=[5]
// applying side-effects of command 5
// finishing and acknowledging command 5; rejected=false
// committing batch with commands=[6 7]
// applying side-effects of command 6
// applying side-effects of command 7
// finishing and acknowledging command 6; rejected=true
// finishing and acknowledging command 7; rejected=false
}
Loading

0 comments on commit 492d8dd

Please sign in to comment.