Skip to content

Commit

Permalink
storage: ack Raft proposals after Raft log commit, not state machine …
Browse files Browse the repository at this point in the history
…apply

Informs cockroachdb#17500.

This is a partial revival of cockroachdb#18710 and a culmination of more recent
thinking in cockroachdb#17500 (comment).

The change adjusts the Raft processing loop so that it acknowledges the success
of raft entries as soon as it learns that they have been durably committed to
the raft log instead of after they have been applied to the proposer replica's
replicated state machine. This not only pulls the application latency out of the
hot path for Raft proposals, but it also pulls the next raft ready iteration's
write to its Raft log (with the associated fsync) out of the hot path for Raft
proposals.

This is safe because a proposal through raft is known to have succeeded as
soon as it is replicated to a quorum of replicas (i.e. has committed in the
raft log). The proposal does not need to wait for its effects to be applied
in order to know whether its changes will succeed or fail. The raft log is
the provider of atomicity and durability for replicated writes, not (ignoring
log truncation) the replicated state machine itself, so a client can be
confident in the result of a write as soon as the raft log confirms that it
has succeeded.

However, there are a few complications to acknowledging the success of a
proposal at this stage:

1. Committing an entry in the raft log and having the command in that entry
   succeed are similar but not equivalent concepts. Even if the entry succeeds
   in achieving durability by replicating to a quorum of replicas, its command
   may still be rejected "beneath raft". This means that a (deterministic)
   check after replication decides that the command will not be applied to the
   replicated state machine. In that case, the client waiting on the result of
   the command should not be informed of its success. Luckily, this check is
   cheap to perform so we can do it here and when applying the command. See
   Replica.shouldApplyCommand.

2. Some commands perform non-trivial work such as updating Replica configuration
   state or performing Range splits. In those cases, it's likely that the client
   is interested in not only knowing whether it has succeeded in sequencing the
   change in the raft log, but also in knowing when the change has gone into
   effect. There's currently no exposed hook to ask for an acknowledgement only
   after a command has been applied, so for simplicity the current implementation
   only ever acks transactional writes before they have gone into effect. All
   other commands wait until they have been applied to ack their client.

3. Even though we can determine whether a command has succeeded without applying
   it, the effect of the command will not be visible to conflicting commands until
   it is applied. Because of this, the client can be informed of the success of
   a write at this point, but we cannot release that write's latches until the
   write has applied. See ProposalData.signalProposalResult/finishApplication.

\### Benchmarks

The change appears to result in an **8-10%** improvement to throughput and a
**6-10%** reduction in p50 latency across the board on kv0. I ran a series of
tests with different node sizes and difference workload concurrencies and the
win seemed pretty stable. This was also true regardless of whether the writes
were to a single Raft group or a large number of Raft groups.

```
name                           old ops/sec  new ops/sec  delta
kv0/cores=16/nodes=3/conc=32    24.1k ± 0%   26.1k ± 1%   +8.35%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=48    30.4k ± 1%   32.8k ± 1%   +8.02%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=64    34.6k ± 1%   37.6k ± 0%   +8.79%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=72    46.6k ± 1%   50.8k ± 0%   +8.99%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=108   58.8k ± 1%   64.0k ± 1%   +8.99%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=144   68.1k ± 1%   74.5k ± 1%   +9.45%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=144   55.8k ± 1%   59.7k ± 2%   +7.12%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=216   64.4k ± 4%   68.1k ± 4%   +5.65%  (p=0.016 n=5+5)
kv0/cores=72/nodes=3/conc=288   68.8k ± 2%   74.5k ± 3%   +8.39%  (p=0.008 n=5+5)

name                           old p50(ms)  new p50(ms)  delta
kv0/cores=16/nodes=3/conc=32     1.30 ± 0%    1.20 ± 0%   -7.69%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=48     1.50 ± 0%    1.40 ± 0%   -6.67%  (p=0.008 n=5+5)
kv0/cores=16/nodes=3/conc=64     1.70 ± 0%    1.60 ± 0%   -5.88%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=72     1.40 ± 0%    1.30 ± 0%   -7.14%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=108    1.60 ± 0%    1.50 ± 0%   -6.25%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=144    1.84 ± 3%    1.70 ± 0%   -7.61%  (p=0.000 n=5+4)
kv0/cores=72/nodes=3/conc=144    2.00 ± 0%    1.80 ± 0%  -10.00%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=216    2.46 ± 2%    2.20 ± 0%  -10.57%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=288    2.80 ± 0%    2.60 ± 0%   -7.14%  (p=0.079 n=4+5)

name                           old p99(ms)  new p99(ms)  delta
kv0/cores=16/nodes=3/conc=32     3.50 ± 0%    3.50 ± 0%     ~     (all equal)
kv0/cores=16/nodes=3/conc=48     4.70 ± 0%    4.58 ± 3%     ~     (p=0.167 n=5+5)
kv0/cores=16/nodes=3/conc=64     5.50 ± 0%    5.20 ± 0%   -5.45%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=72     5.00 ± 0%    4.70 ± 0%   -6.00%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=108    5.80 ± 0%    5.50 ± 0%   -5.17%  (p=0.008 n=5+5)
kv0/cores=36/nodes=3/conc=144    6.48 ± 3%    6.18 ± 3%   -4.63%  (p=0.079 n=5+5)
kv0/cores=72/nodes=3/conc=144    11.0 ± 0%    10.5 ± 0%   -4.55%  (p=0.008 n=5+5)
kv0/cores=72/nodes=3/conc=216    13.4 ± 2%    13.2 ± 5%     ~     (p=0.683 n=5+5)
kv0/cores=72/nodes=3/conc=288    18.2 ± 4%    17.2 ± 3%   -5.70%  (p=0.079 n=5+5)
```

Release note (performance improvement): Raft entries no longer wait to
be applied to the RocksDB storage engine before signaling their success
to clients, they now only wait until they are committed in their Raft log.
  • Loading branch information
nvanbenschoten committed Jul 18, 2019
1 parent bfece87 commit cf39259
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 198 deletions.
52 changes: 31 additions & 21 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,19 @@ func TestRejectFutureCommand(t *testing.T) {
if advance := ts3.GoTime().Sub(ts2.GoTime()); advance != 0 {
t.Fatalf("expected clock not to advance, but it advanced by %s", advance)
}
val, _, err := engine.MVCCGet(context.Background(), mtc.engines[0], key, ts3,
engine.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if a, e := mustGetInt(val), incArgs.Increment*numCmds; a != e {
t.Errorf("expected %d, got %d", e, a)
}
// Raft entry application is asynchronous, so we may not see the update to
// the key immediately.
testutils.SucceedsSoon(t, func() error {
val, _, err := engine.MVCCGet(context.Background(), mtc.engines[0], key, ts3,
engine.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if a, e := mustGetInt(val), incArgs.Increment*numCmds; a != e {
return errors.Errorf("expected %d, got %d", e, a)
}
return nil
})
}

// TestTxnPutOutOfOrder tests a case where a put operation of an older
Expand Down Expand Up @@ -1811,19 +1816,24 @@ func TestClearRange(t *testing.T) {

verifyKeysWithPrefix := func(prefix roachpb.Key, expectedKeys []roachpb.Key) {
t.Helper()
start := engine.MakeMVCCMetadataKey(prefix)
end := engine.MakeMVCCMetadataKey(prefix.PrefixEnd())
kvs, err := engine.Scan(store.Engine(), start, end, 0 /* maxRows */)
if err != nil {
t.Fatal(err)
}
var actualKeys []roachpb.Key
for _, kv := range kvs {
actualKeys = append(actualKeys, kv.Key.Key)
}
if !reflect.DeepEqual(expectedKeys, actualKeys) {
t.Fatalf("expected %v, but got %v", expectedKeys, actualKeys)
}
// Raft entry application is asynchronous, so we may not see the updates
// to the keys immediately.
testutils.SucceedsSoon(t, func() error {
start := engine.MakeMVCCMetadataKey(prefix)
end := engine.MakeMVCCMetadataKey(prefix.PrefixEnd())
kvs, err := engine.Scan(store.Engine(), start, end, 0 /* maxRows */)
if err != nil {
t.Fatal(err)
}
var actualKeys []roachpb.Key
for _, kv := range kvs {
actualKeys = append(actualKeys, kv.Key.Key)
}
if !reflect.DeepEqual(expectedKeys, actualKeys) {
return errors.Errorf("expected %v, but got %v", expectedKeys, actualKeys)
}
return nil
})
}

rng, _ := randutil.NewPseudoRand()
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ func TestStoreRangeSplitStats(t *testing.T) {
storeCfg := storage.TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
storeCfg.TestingKnobs.DisableSplitQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableRaftAckBeforeApplication = true
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
store := createTestStoreWithConfig(t, stopper, storeCfg)
Expand Down Expand Up @@ -2593,6 +2594,7 @@ func TestUnsplittableRange(t *testing.T) {
}
cfg.TestingKnobs.SplitQueuePurgatoryChan = splitQueuePurgatoryChan
cfg.TestingKnobs.DisableMergeQueue = true
cfg.TestingKnobs.DisableRaftAckBeforeApplication = true
store := createTestStoreWithConfig(t, stopper, cfg)

// Add a single large row to /Table/14.
Expand Down Expand Up @@ -2774,6 +2776,7 @@ func TestStoreCapacityAfterSplit(t *testing.T) {
cfg := storage.TestStoreConfig(hlc.NewClock(manualClock.UnixNano, time.Nanosecond))
cfg.TestingKnobs.DisableSplitQueue = true
cfg.TestingKnobs.DisableMergeQueue = true
cfg.TestingKnobs.DisableRaftAckBeforeApplication = true
s := createTestStoreWithOpts(
t,
testStoreOpts{
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/client_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestComputeStatsForKeySpan(t *testing.T) {
defer leaktest.AfterTest(t)()
storeCfg := storage.TestStoreConfig(nil /* clock */)
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableRaftAckBeforeApplication = true
mtc := &multiTestContext{
storeConfig: &storeCfg,
}
Expand Down
94 changes: 50 additions & 44 deletions pkg/storage/cmd_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,31 @@ import (
"go.etcd.io/etcd/raft/raftpb"
)

// cmdAppBatch accumulates state due to the application of raft
// commands. Committed raft commands are applied to the batch in a multi-stage
// process whereby individual commands are decoded, prepared for application
// relative to the current view of replicaState, committed to the storage
// engine, applied to the Replica's in-memory state and then finished by
// releasing their latches and notifying clients.
// entryGen is a generator of raft entries. cmdAppBatch uses the type when
// iterating over committed entries to decode and apply.
//
// The entry and next methods should only be called if valid returns true.
type entryGen []raftpb.Entry

func (g *entryGen) valid() bool { return len(*g) != 0 }
func (g *entryGen) entry() *raftpb.Entry { return &(*g)[0] }
func (g *entryGen) next() { *g = (*g)[1:] }

// cmdAppBatch accumulates state due to the application of raft commands.
// Committed raft commands are applied to the batch in a multi-stage process
// whereby individual commands are decoded, prepared for application relative to
// the current view of replicaState, committed to the storage engine, applied to
// the Replica's in-memory state and then finished by releasing their latches
// and notifying clients.
type cmdAppBatch struct {
// decodeBuf is used to decode an entry before adding it to the batch.
// See decode().
decodeBuf decodedRaftEntry
decodeBuf decodedRaftEntry
decodeBufFull bool

// cmdBuf is a buffer containing decoded raft entries that are ready to be
// applied in the same batch.
cmdBuf cmdAppCtxBuf

// batch accumulates writes implied by the raft entries in this batch.
batch engine.Batch
Expand All @@ -54,12 +69,10 @@ type cmdAppBatch struct {
// TODO(ajwerner): consider whether this logic should imply that commands
// which update truncated state are non-trivial.
updatedTruncatedState bool

cmdBuf cmdAppCtxBuf
}

// cmdAppBatch structs are needed to apply raft commands, which is to
// say, frequently, so best to pool them rather than allocated under the raftMu.
// cmdAppBatch structs are needed to apply raft commands, which is to say,
// frequently, so best to pool them rather than allocated under the raftMu.
var cmdAppBatchSyncPool = sync.Pool{
New: func() interface{} {
return new(cmdAppBatch)
Expand All @@ -70,7 +83,7 @@ func getCmdAppBatch() *cmdAppBatch {
return cmdAppBatchSyncPool.Get().(*cmdAppBatch)
}

func releaseCmdAppBatch(b *cmdAppBatch) {
func (b *cmdAppBatch) release() {
b.cmdBuf.clear()
*b = cmdAppBatch{}
cmdAppBatchSyncPool.Put(b)
Expand All @@ -83,49 +96,42 @@ func (b *cmdAppBatch) add(e *raftpb.Entry, d decodedRaftEntry) {
s.e = e
}

// decode decodes commands from toProcess until either all of the commands have
// decode decodes commands from gen until either all of the commands have
// been added to the batch or a non-trivial command is decoded. Non-trivial
// commands must always be in their own batch. If a non-trivial command is
// encountered the batch is returned immediately without adding the newly
// decoded command to the batch or removing it from remaining.
// It is the client's responsibility to deal with non-trivial commands.
//
// numEmptyEntries indicates the number of entries in the consumed portion of
// toProcess contained a zero-byte payload.
func (b *cmdAppBatch) decode(
ctx context.Context, toProcess []raftpb.Entry, decodeBuf *decodedRaftEntry,
) (
foundNonTrivialEntry bool,
numEmptyEntries int,
remaining []raftpb.Entry,
errExpl string,
err error,
) {
for len(toProcess) > 0 {
e := &toProcess[0]
if len(e.Data) == 0 {
numEmptyEntries++
}
if errExpl, err = decodeBuf.decode(ctx, e); err != nil {
return false, numEmptyEntries, nil, errExpl, err
// decoded command to the batch or removing it from remaining. It is the
// client's responsibility to deal with non-trivial commands.
func (b *cmdAppBatch) decode(ctx context.Context, gen *entryGen) (errExpl string, err error) {
for gen.valid() {
e := gen.entry()
if errExpl, err = b.decodeBuf.decode(ctx, e); err != nil {
return errExpl, err
}
b.decodeBufFull = true
// This is a non-trivial entry which needs to be processed alone.
foundNonTrivialEntry = !isTrivial(decodeBuf.replicatedResult(),
b.replicaState.UsingAppliedStateKey)
if foundNonTrivialEntry {
if !isTrivial(b.decodeBuf.replicatedResult(), b.replicaState.UsingAppliedStateKey) {
break
}
// We're going to process this entry in this batch so pop it from toProcess
// and add to appStates.
toProcess = toProcess[1:]
b.add(e, *decodeBuf)
// We're going to process this entry in this batch so add to the
// cmdBuf and advance the generator.
b.add(e, b.popDecodeBuf())
gen.next()
}
return foundNonTrivialEntry, numEmptyEntries, toProcess, "", nil
return "", nil
}

func (b *cmdAppBatch) popDecodeBuf() decodedRaftEntry {
b.decodeBufFull = false
return b.decodeBuf
}

func (b *cmdAppBatch) reset() {
// resetBatch resets the accumulate batch state in the cmdAppBatch.
// However, it does not reset the receiver's decode buffer.
func (b *cmdAppBatch) resetBatch() {
b.cmdBuf.clear()
*b = cmdAppBatch{
decodeBuf: b.decodeBuf, // preserve the previously decoded entry
decodeBuf: b.decodeBuf, // preserve the previously decoded entry
decodeBufFull: b.decodeBufFull,
}
}
9 changes: 7 additions & 2 deletions pkg/storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,10 +843,15 @@ func TestGCQueueTransactionTable(t *testing.T) {
func TestGCQueueIntentResolution(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tc := testContext{}
tc := testContext{manualClock: hlc.NewManualClock(123)}
tsc := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond))
// Ensure that writes have been applied before being acknowledged.
// The GC queue scans the storage engine directly and we want to
// make sure it doesn't miss any intents.
tsc.TestingKnobs.DisableRaftAckBeforeApplication = true
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(t, stopper)
tc.StartWithStoreConfig(t, stopper, tsc)

tc.manualClock.Set(48 * 60 * 60 * 1E9) // 2d past the epoch
now := tc.Clock().Now().WallTime
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ func TestTruncateLog(t *testing.T) {
tc := testContext{}
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableRaftLogQueue = true
// Ensure that writes have been applied (and updated the replica's
// last index with their log index) before being acknowledged.
cfg.TestingKnobs.DisableRaftAckBeforeApplication = true
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.StartWithStoreConfig(t, stopper, cfg)
Expand Down Expand Up @@ -881,6 +884,9 @@ func TestTruncateLogRecompute(t *testing.T) {
}
cfg := TestStoreConfig(nil)
cfg.TestingKnobs.DisableRaftLogQueue = true
// Ensure that writes have been applied (and updated the replica's
// last index with their log index) before being acknowledged.
cfg.TestingKnobs.DisableRaftAckBeforeApplication = true
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.StartWithStoreConfig(t, stopper, cfg)
Expand Down
Loading

0 comments on commit cf39259

Please sign in to comment.