Skip to content

Commit

Permalink
storage: ack Raft proposals after Raft log commit, not state machine …
Browse files Browse the repository at this point in the history
…apply

Informs #17500.

This is a partial revival of #18710 and a culmination of more recent
thinking in #17500 (comment).

The change adjusts the Raft processing loop so that it acknowledges the success
of raft entries as soon as it learns that they have been durably committed to
the raft log instead of after they have been applied to the proposer replica's
replicated state machine. This not only pulls the application latency out of the
hot path for Raft proposals, but it also pulls the next raft ready iteration's
write to its Raft log (with the associated fsync) out of the hot path for Raft
proposals.

This is safe because a proposal through raft is known to have succeeded as
soon as it is replicated to a quorum of replicas (i.e. has committed in the
raft log). The proposal does not need to wait for its effects to be applied
in order to know whether its changes will succeed or fail. The raft log is
the provider of atomicity and durability for replicated writes, not (ignoring
log truncation) the replicated state machine itself, so a client can be
confident in the result of a write as soon as the raft log confirms that it
has succeeded.

However, there are a few complications to acknowledging the success of a
proposal at this stage:

1. Committing an entry in the raft log and having the command in that entry
   succeed are similar but not equivalent concepts. Even if the entry succeeds
   in achieving durability by replicating to a quorum of replicas, its command
   may still be rejected "beneath raft". This means that a (deterministic)
   check after replication decides that the command will not be applied to the
   replicated state machine. In that case, the client waiting on the result of
   the command should not be informed of its success. Luckily, this check is
   cheap to perform so we can do it here and when applying the command. See
   Replica.shouldApplyCommand.

2. Some commands perform non-trivial work such as updating Replica configuration
   state or performing Range splits. In those cases, it's likely that the client
   is interested in not only knowing whether it has succeeded in sequencing the
   change in the raft log, but also in knowing when the change has gone into
   effect. There's currently no exposed hook to ask for an acknowledgement only
   after a command has been applied, so for simplicity the current implementation
   only ever acks transactional writes before they have gone into effect. All
   other commands wait until they have been applied to ack their client.

3. Even though we can determine whether a command has succeeded without applying
   it, the effect of the command will not be visible to conflicting commands until
   it is applied. Because of this, the client can be informed of the success of
   a write at this point, but we cannot release that write's latches until the
   write has applied. See ProposalData.signalProposalResult/finishApplication.

\### Benchmarks

The change appears to result in an **8-10%** improvement to throughput and a
**6-10%** reduction in p50 latency across the board on kv0. I ran a series of
tests with different node sizes and difference workload concurrencies and the
win seemed pretty stable. This was also true regardless of whether the writes
were to a single Raft group or a large number of Raft groups.

```
name                           old ops/sec  new ops/sec  delta
kv0/cores=16/nodes=3/conc=32    24.1k ± 0%   26.1k ± 1%   +8.35%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=48    30.4k ± 1%   32.8k ± 1%   +8.02%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=64    34.6k ± 1%   37.6k ± 0%   +8.79%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=72    46.6k ± 1%   50.8k ± 0%   +8.99%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=108   58.8k ± 1%   64.0k ± 1%   +8.99%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=144   68.1k ± 1%   74.5k ± 1%   +9.45%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=144   55.8k ± 1%   59.7k ± 2%   +7.12%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=216   64.4k ± 4%   68.1k ± 4%   +5.65%  (p=0.016 n=5+5)
kv0/cores=72/nodes=3/conc=288   68.8k ± 2%   74.5k ± 3%   +8.39%  (p=0.008 n=5+5)

name                           old p50(ms)  new p50(ms)  delta
kv0/cores=16/nodes=3/conc=32     1.30 ± 0%    1.20 ± 0%   -7.69%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=48     1.50 ± 0%    1.40 ± 0%   -6.67%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=64     1.70 ± 0%    1.60 ± 0%   -5.88%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=72     1.40 ± 0%    1.30 ± 0%   -7.14%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=108    1.60 ± 0%    1.50 ± 0%   -6.25%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=144    1.84 ± 3%    1.70 ± 0%   -7.61%  (p=0.000 n=5+4)
kv0/cores=72/nodes=3/conc=144    2.00 ± 0%    1.80 ± 0%  -10.00%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=216    2.46 ± 2%    2.20 ± 0%  -10.57%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=288    2.80 ± 0%    2.60 ± 0%   -7.14%  (p=0.079 n=4+5)

name                           old p99(ms)  new p99(ms)  delta
kv0/cores=16/nodes=3/conc=32     3.50 ± 0%    3.50 ± 0%     ~     (all equal)
kv0/cores=16/nodes=3/conc=48     4.70 ± 0%    4.58 ± 3%     ~     (p=0.167 n=5+5)
kv0/cores=16/nodes=3/conc=64     5.50 ± 0%    5.20 ± 0%   -5.45%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=72     5.00 ± 0%    4.70 ± 0%   -6.00%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=108    5.80 ± 0%    5.50 ± 0%   -5.17%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=144    6.48 ± 3%    6.18 ± 3%   -4.63%  (p=0.079 n=5+5)
kv0/cores=72/nodes=3/conc=144    11.0 ± 0%    10.5 ± 0%   -4.55%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=216    13.4 ± 2%    13.2 ± 5%     ~     (p=0.683 n=5+5)
kv0/cores=72/nodes=3/conc=288    18.2 ± 4%    17.2 ± 3%   -5.70%  (p=0.079 n=5+5)
```

Release note (performance improvement): Raft entries no longer wait to
be applied to the RocksDB storage engine before signaling their success
to clients, they now only wait until they are committed in their Raft log.
  • Loading branch information
nvanbenschoten committed Aug 12, 2019
1 parent d05f199 commit 9ad4965
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 33 deletions.
20 changes: 20 additions & 0 deletions pkg/storage/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ type CheckedCommand interface {
Command
// Rejected returns whether the command was rejected.
Rejected() bool
// CanAckBeforeApplication returns whether the success of the command
// can be acknowledged before the command has been applied to the state
// machine.
CanAckBeforeApplication() bool
// AckSuccess acknowledges the success of the command to its client.
// Must only be called if !Rejected.
AckSuccess() error
}

// AppliedCommand is a command that has been applied to the replicated state
Expand Down Expand Up @@ -182,6 +189,19 @@ func mapCheckedCmdIter(
return ret, nil
}

// forEachCheckedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error {
defer iter.Close()
for iter.Valid() {
if err := fn(iter.CurChecked()); err != nil {
return err
}
iter.Next()
}
return nil
}

// forEachAppliedCmdIter calls a closure on each command in the provided
// iterator. The function closes the provided iterator.
func forEachAppliedCmdIter(iter AppliedCommandIterator, fn func(AppliedCommand) error) error {
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/apply/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the
completion of that change, client responses are determined before replication
begins. The only remaining work to be done after replication of a command
succeeds is to determine whether it will be rejected and replaced by an empty
command.
command. To facilitate this acknowledgement as early as possible, this package
provides the ability to acknowledge a series of commands before applying them to
the state machine. Outcomes are determined before performing any durable work by
stepping commands through an in-memory "ephemeral" copy of the state machine.
For more, see Task.AckCommittedEntriesBeforeApplication.
A final challenge comes from the desire to properly prioritize the application
of commands across multiple state machines in systems like CockroachDB where
Expand Down
43 changes: 39 additions & 4 deletions pkg/storage/apply/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,46 @@ func ExampleTask() {
dec.nonLocal[6] = true
dec.shouldReject[3] = true
dec.shouldReject[6] = true
fmt.Print(`
Setting up a batch of seven log entries:
- index 2 and 6 are non-local
- index 3 and 6 will be rejected
- index 5 is not trivial
`)

t := apply.MakeTask(sm, dec)
defer t.Close()

fmt.Println("Decode:")
fmt.Println("\nDecode (note that index 2 and 6 are not local):")
if err := t.Decode(ctx, ents); err != nil {
panic(err)
}

fmt.Println("\nAckCommittedEntriesBeforeApplication:")
if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil {
panic(err)
}
fmt.Print(`
Above, only index 1 and 4 get acked early. The command at 5 is
non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry
must be in the first batch to qualify for acking early. 2 is not local
(so there's nobody to ack), and 3 is rejected. We can't ack rejected
commands early because the state machine is free to handle them any way
it likes.
`)

fmt.Println("\nApplyCommittedEntries:")
if err := t.ApplyCommittedEntries(ctx); err != nil {
panic(err)
}
// Output:
//
// Decode:
// Setting up a batch of seven log entries:
// - index 2 and 6 are non-local
// - index 3 and 6 will be rejected
// - index 5 is not trivial
//
// Decode (note that index 2 and 6 are not local):
// decoding command 1; local=true
// decoding command 2; local=false
// decoding command 3; local=true
Expand All @@ -53,16 +77,27 @@ func ExampleTask() {
// decoding command 6; local=false
// decoding command 7; local=true
//
// AckCommittedEntriesBeforeApplication:
// acknowledging command 1 before application
// acknowledging command 4 before application
//
// Above, only index 1 and 4 get acked early. The command at 5 is
// non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry
// must be in the first batch to qualify for acking early. 2 is not local
// (so there's nobody to ack), and 3 is rejected. We can't ack rejected
// commands early because the state machine is free to handle them any way
// it likes.
//
// ApplyCommittedEntries:
// applying batch with commands=[1 2 3 4]
// applying side-effects of command 1
// applying side-effects of command 2
// applying side-effects of command 3
// applying side-effects of command 4
// finishing and acknowledging command 1; rejected=false
// finishing command 1; rejected=false
// finishing and acknowledging command 2; rejected=false
// finishing and acknowledging command 3; rejected=true
// finishing and acknowledging command 4; rejected=false
// finishing command 4; rejected=false
// applying batch with commands=[5]
// applying side-effects of command 5
// finishing and acknowledging command 5; rejected=false
Expand Down
112 changes: 110 additions & 2 deletions pkg/storage/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ type StateMachine interface {
// effects that a group of Commands will have on the replicated state
// machine. Commands are staged in the batch one-by-one and then the
// entire batch is committed at once.
NewBatch() Batch
//
// Batch comes in two flavors - real batches and ephemeral batches.
// Real batches are capable of accumulating updates from commands and
// applying them to the state machine. Ephemeral batches are not able
// to make changes to the durable state machine, but can still be used
// for the purpose of checking commands to determine whether they will
// be rejected or not when staged in a real batch. The principal user
// of ephemeral batches is AckCommittedEntriesBeforeApplication.
NewBatch(ephemeral bool) Batch
// ApplySideEffects applies the in-memory side-effects of a Command to
// the replicated state machine. The method will be called in the order
// that the commands are committed to the state machine's log. Once the
Expand Down Expand Up @@ -117,6 +125,106 @@ func (t *Task) assertDecoded() {
}
}

// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of
// raft entries that have been durably committed to the raft log but have not
// yet been applied to the proposer replica's replicated state machine.
//
// This is safe because a proposal through raft can be known to have succeeded
// as soon as it is durably replicated to a quorum of replicas (i.e. has
// committed in the raft log). The proposal does not need to wait for the
// effects of the proposal to be applied in order to know whether its changes
// will succeed or fail. This is because the raft log is the provider of
// atomicity and durability for replicated writes, not (ignoring log
// truncation) the replicated state machine itself.
//
// However, there are a few complications to acknowledging the success of a
// proposal at this stage:
//
// 1. Committing an entry in the raft log and having the command in that entry
// succeed are similar but not equivalent concepts. Even if the entry succeeds
// in achieving durability by replicating to a quorum of replicas, its command
// may still be rejected "beneath raft". This means that a (deterministic)
// check after replication decides that the command will not be applied to the
// replicated state machine. In that case, the client waiting on the result of
// the command should not be informed of its success. Luckily, this check is
// cheap to perform so we can do it here and when applying the command.
//
// Determining whether the command will succeed or be rejected before applying
// it for real is accomplished using an ephemeral batch. Commands are staged in
// the ephemeral batch to acquire CheckedCommands, which can then be acknowledged
// immediately even though the ephemeral batch itself cannot be used to update
// the durable state machine. Once the rejection status of each command is
// determined, any successful commands that permit acknowledgement before
// application (see CanAckBeforeApplication) are acknowledged. The ephemeral
// batch is then thrown away.
//
// 2. Some commands perform non-trivial work such as updating Replica configuration
// state or performing Range splits. In those cases, it's likely that the client
// is interested in not only knowing whether it has succeeded in sequencing the
// change in the raft log, but also in knowing when the change has gone into
// effect. There's currently no exposed hook to ask for an acknowledgement only
// after a command has been applied, so for simplicity the current implementation
// only ever acks transactional writes before they have gone into effect. All
// other commands wait until they have been applied to ack their client.
//
// 3. Even though we can determine whether a command has succeeded without applying
// it, the effect of the command will not be visible to conflicting commands until
// it is applied. Because of this, the client can be informed of the success of
// a write at this point, but we cannot release that write's latches until the
// write has applied. See ProposalData.signalProposalResult/finishApplication.
//
// 4. etcd/raft may provided a series of CommittedEntries in a Ready struct that
// haven't actually been appended to our own log. This is most common in single
// node replication groups, but it is possible when a follower in a multi-node
// replication group is catching up after falling behind. In the first case,
// the entries are not yet committed so acknowledging them would be a lie. In
// the second case, the entries are committed so we could acknowledge them at
// this point, but doing so seems risky. To avoid complications in either case,
// the method takes a maxIndex parameter that limits the indexes that it will
// acknowledge. Typically, callers will supply the highest index that they have
// durably written to their raft log for this upper bound.
//
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error {
t.assertDecoded()
if !t.anyLocal {
return nil // fast-path
}

// Create a new ephemeral application batch. All we're interested in is
// whether commands will be rejected or not when staged in a real batch.
batch := t.sm.NewBatch(true /* ephemeral */)
defer batch.Close()

iter := t.dec.NewCommandIter()
defer iter.Close()

// Collect a batch of trivial commands from the applier. Stop at the first
// non-trivial command or at the first command with an index above maxIndex.
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
if cmd.Index() > maxIndex {
return false
}
return cmd.IsTrivial()
})

// Stage the commands in the (ephemeral) batch.
stagedIter, err := mapCmdIter(batchIter, batch.Stage)
if err != nil {
return err
}

// Acknowledge any locally-proposed commands that succeeded in being staged
// in the batch and can be acknowledged before they are actually applied.
// Don't acknowledge rejected proposals early because the StateMachine may
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess()
}
return nil
})
}

// SetMaxBatchSize sets the maximum application batch size. If 0, no limit
// will be placed on the number of commands that can be applied in a batch.
func (t *Task) SetMaxBatchSize(size int) {
Expand Down Expand Up @@ -144,7 +252,7 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {
// b) exactly one non-trivial command
func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error {
// Create a new application batch.
batch := t.sm.NewBatch()
batch := t.sm.NewBatch(false /* ephemeral */)
defer batch.Close()

// Consume a batch-worth of commands.
Expand Down
Loading

0 comments on commit 9ad4965

Please sign in to comment.