From 87aaea70d68903ad04ecae1adf2910333aa07a78 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 16 Jul 2019 20:03:16 -0400 Subject: [PATCH] storage: ack Raft proposals after Raft log commit, not state machine apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Informs #17500. This is a partial revival of #18710 and a culmination of more recent thinking in https://github.com/cockroachdb/cockroach/issues/17500#issuecomment-467238817. The change adjusts the Raft processing loop so that it acknowledges the success of raft entries as soon as it learns that they have been durably committed to the raft log instead of after they have been applied to the proposer replica's replicated state machine. This not only pulls the application latency out of the hot path for Raft proposals, but it also pulls the next raft ready iteration's write to its Raft log (with the associated fsync) out of the hot path for Raft proposals. This is safe because a proposal through raft is known to have succeeded as soon as it is replicated to a quorum of replicas (i.e. has committed in the raft log). The proposal does not need to wait for its effects to be applied in order to know whether its changes will succeed or fail. The raft log is the provider of atomicity and durability for replicated writes, not (ignoring log truncation) the replicated state machine itself, so a client can be confident in the result of a write as soon as the raft log confirms that it has succeeded. However, there are a few complications to acknowledging the success of a proposal at this stage: 1. Committing an entry in the raft log and having the command in that entry succeed are similar but not equivalent concepts. Even if the entry succeeds in achieving durability by replicating to a quorum of replicas, its command may still be rejected "beneath raft". This means that a (deterministic) check after replication decides that the command will not be applied to the replicated state machine. In that case, the client waiting on the result of the command should not be informed of its success. Luckily, this check is cheap to perform so we can do it here and when applying the command. See Replica.shouldApplyCommand. 2. Some commands perform non-trivial work such as updating Replica configuration state or performing Range splits. In those cases, it's likely that the client is interested in not only knowing whether it has succeeded in sequencing the change in the raft log, but also in knowing when the change has gone into effect. There's currently no exposed hook to ask for an acknowledgement only after a command has been applied, so for simplicity the current implementation only ever acks transactional writes before they have gone into effect. All other commands wait until they have been applied to ack their client. 3. Even though we can determine whether a command has succeeded without applying it, the effect of the command will not be visible to conflicting commands until it is applied. Because of this, the client can be informed of the success of a write at this point, but we cannot release that write's latches until the write has applied. See ProposalData.signalProposalResult/finishApplication. \### Benchmarks The change appears to result in an **8-10%** improvement to throughput and a **6-10%** reduction in p50 latency across the board on kv0. I ran a series of tests with different node sizes and difference workload concurrencies and the win seemed pretty stable. This was also true regardless of whether the writes were to a single Raft group or a large number of Raft groups. ``` name old ops/sec new ops/sec delta kv0/cores=16/nodes=3/conc=32 24.1k ± 0% 26.1k ± 1% +8.35% (p=0.008 n=5+5) kv0/cores=16/nodes=3/conc=48 30.4k ± 1% 32.8k ± 1% +8.02% (p=0.008 n=5+5) kv0/cores=16/nodes=3/conc=64 34.6k ± 1% 37.6k ± 0% +8.79% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=72 46.6k ± 1% 50.8k ± 0% +8.99% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=108 58.8k ± 1% 64.0k ± 1% +8.99% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=144 68.1k ± 1% 74.5k ± 1% +9.45% (p=0.008 n=5+5) kv0/cores=72/nodes=3/conc=144 55.8k ± 1% 59.7k ± 2% +7.12% (p=0.008 n=5+5) kv0/cores=72/nodes=3/conc=216 64.4k ± 4% 68.1k ± 4% +5.65% (p=0.016 n=5+5) kv0/cores=72/nodes=3/conc=288 68.8k ± 2% 74.5k ± 3% +8.39% (p=0.008 n=5+5) name old p50(ms) new p50(ms) delta kv0/cores=16/nodes=3/conc=32 1.30 ± 0% 1.20 ± 0% -7.69% (p=0.008 n=5+5) kv0/cores=16/nodes=3/conc=48 1.50 ± 0% 1.40 ± 0% -6.67% (p=0.008 n=5+5) kv0/cores=16/nodes=3/conc=64 1.70 ± 0% 1.60 ± 0% -5.88% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=72 1.40 ± 0% 1.30 ± 0% -7.14% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=108 1.60 ± 0% 1.50 ± 0% -6.25% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=144 1.84 ± 3% 1.70 ± 0% -7.61% (p=0.000 n=5+4) kv0/cores=72/nodes=3/conc=144 2.00 ± 0% 1.80 ± 0% -10.00% (p=0.008 n=5+5) kv0/cores=72/nodes=3/conc=216 2.46 ± 2% 2.20 ± 0% -10.57% (p=0.008 n=5+5) kv0/cores=72/nodes=3/conc=288 2.80 ± 0% 2.60 ± 0% -7.14% (p=0.079 n=4+5) name old p99(ms) new p99(ms) delta kv0/cores=16/nodes=3/conc=32 3.50 ± 0% 3.50 ± 0% ~ (all equal) kv0/cores=16/nodes=3/conc=48 4.70 ± 0% 4.58 ± 3% ~ (p=0.167 n=5+5) kv0/cores=16/nodes=3/conc=64 5.50 ± 0% 5.20 ± 0% -5.45% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=72 5.00 ± 0% 4.70 ± 0% -6.00% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=108 5.80 ± 0% 5.50 ± 0% -5.17% (p=0.008 n=5+5) kv0/cores=36/nodes=3/conc=144 6.48 ± 3% 6.18 ± 3% -4.63% (p=0.079 n=5+5) kv0/cores=72/nodes=3/conc=144 11.0 ± 0% 10.5 ± 0% -4.55% (p=0.008 n=5+5) kv0/cores=72/nodes=3/conc=216 13.4 ± 2% 13.2 ± 5% ~ (p=0.683 n=5+5) kv0/cores=72/nodes=3/conc=288 18.2 ± 4% 17.2 ± 3% -5.70% (p=0.079 n=5+5) ``` Release note (performance improvement): Raft entries no longer wait to be applied to the RocksDB storage engine before signaling their success to clients, they now only wait until they are committed in their Raft log. --- pkg/storage/apply/cmd.go | 20 +++ pkg/storage/apply/doc.go | 6 +- pkg/storage/apply/doc_test.go | 43 ++++++- pkg/storage/apply/task.go | 112 ++++++++++++++++- pkg/storage/apply/task_test.go | 114 ++++++++++++++++-- pkg/storage/client_raft_test.go | 104 ++++++++++++++++ pkg/storage/replica_application_cmd.go | 33 +++++ .../replica_application_state_machine.go | 43 ++++++- pkg/storage/replica_raft.go | 42 +++++-- pkg/storage/store_test.go | 8 +- 10 files changed, 492 insertions(+), 33 deletions(-) diff --git a/pkg/storage/apply/cmd.go b/pkg/storage/apply/cmd.go index 668a9ebe2c56..7ac74b17e2fb 100644 --- a/pkg/storage/apply/cmd.go +++ b/pkg/storage/apply/cmd.go @@ -33,6 +33,13 @@ type CheckedCommand interface { Command // Rejected returns whether the command was rejected. Rejected() bool + // CanAckBeforeApplication returns whether the success of the command + // can be acknowledged before the command has been applied to the state + // machine. + CanAckBeforeApplication() bool + // AckSuccess acknowledges the success of the command to its client. + // Must only be called if !Rejected. + AckSuccess() error } // AppliedCommand is a command that has been applied to the replicated state @@ -182,6 +189,19 @@ func mapCheckedCmdIter( return ret, nil } +// forEachCheckedCmdIter calls a closure on each command in the provided +// iterator. The function closes the provided iterator. +func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error { + defer iter.Close() + for iter.Valid() { + if err := fn(iter.CurChecked()); err != nil { + return err + } + iter.Next() + } + return nil +} + // forEachAppliedCmdIter calls a closure on each command in the provided // iterator. The function closes the provided iterator. func forEachAppliedCmdIter(iter AppliedCommandIterator, fn func(AppliedCommand) error) error { diff --git a/pkg/storage/apply/doc.go b/pkg/storage/apply/doc.go index bf8d51bc4eee..c23e83ae7efc 100644 --- a/pkg/storage/apply/doc.go +++ b/pkg/storage/apply/doc.go @@ -89,7 +89,11 @@ evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the completion of that change, client responses are determined before replication begins. The only remaining work to be done after replication of a command succeeds is to determine whether it will be rejected and replaced by an empty -command. +command. To facilitate this acknowledgement as early as possible, this package +provides the ability to acknowledge a series of commands before applying them to +the state machine. Outcomes are determined before performing any durable work by +stepping commands through an in-memory "ephemeral" copy of the state machine. +For more, see Task.AckCommittedEntriesBeforeApplication. A final challenge comes from the desire to properly prioritize the application of commands across multiple state machines in systems like CockroachDB where diff --git a/pkg/storage/apply/doc_test.go b/pkg/storage/apply/doc_test.go index 1996ea973deb..7ef44292ca48 100644 --- a/pkg/storage/apply/doc_test.go +++ b/pkg/storage/apply/doc_test.go @@ -29,22 +29,46 @@ func ExampleTask() { dec.nonLocal[6] = true dec.shouldReject[3] = true dec.shouldReject[6] = true + fmt.Print(` +Setting up a batch of seven log entries: + - index 2 and 6 are non-local + - index 3 and 6 will be rejected + - index 5 is not trivial +`) t := apply.MakeTask(sm, dec) defer t.Close() - fmt.Println("Decode:") + fmt.Println("\nDecode (note that index 2 and 6 are not local):") if err := t.Decode(ctx, ents); err != nil { panic(err) } + fmt.Println("\nAckCommittedEntriesBeforeApplication:") + if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil { + panic(err) + } + fmt.Print(` +Above, only index 1 and 4 get acked early. The command at 5 is +non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry +must be in the first batch to qualify for acking early. 2 is not local +(so there's nobody to ack), and 3 is rejected. We can't ack rejected +commands early because the state machine is free to handle them any way +it likes. +`) + fmt.Println("\nApplyCommittedEntries:") if err := t.ApplyCommittedEntries(ctx); err != nil { panic(err) } // Output: // - // Decode: + // Setting up a batch of seven log entries: + // - index 2 and 6 are non-local + // - index 3 and 6 will be rejected + // - index 5 is not trivial + // + // Decode (note that index 2 and 6 are not local): // decoding command 1; local=true // decoding command 2; local=false // decoding command 3; local=true @@ -53,16 +77,27 @@ func ExampleTask() { // decoding command 6; local=false // decoding command 7; local=true // + // AckCommittedEntriesBeforeApplication: + // acknowledging command 1 before application + // acknowledging command 4 before application + // + // Above, only index 1 and 4 get acked early. The command at 5 is + // non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry + // must be in the first batch to qualify for acking early. 2 is not local + // (so there's nobody to ack), and 3 is rejected. We can't ack rejected + // commands early because the state machine is free to handle them any way + // it likes. + // // ApplyCommittedEntries: // applying batch with commands=[1 2 3 4] // applying side-effects of command 1 // applying side-effects of command 2 // applying side-effects of command 3 // applying side-effects of command 4 - // finishing and acknowledging command 1; rejected=false + // finishing command 1; rejected=false // finishing and acknowledging command 2; rejected=false // finishing and acknowledging command 3; rejected=true - // finishing and acknowledging command 4; rejected=false + // finishing command 4; rejected=false // applying batch with commands=[5] // applying side-effects of command 5 // finishing and acknowledging command 5; rejected=false diff --git a/pkg/storage/apply/task.go b/pkg/storage/apply/task.go index 681d86779c50..8bfa62acd0a1 100644 --- a/pkg/storage/apply/task.go +++ b/pkg/storage/apply/task.go @@ -29,7 +29,15 @@ type StateMachine interface { // effects that a group of Commands will have on the replicated state // machine. Commands are staged in the batch one-by-one and then the // entire batch is committed at once. - NewBatch() Batch + // + // Batch comes in two flavors - real batches and ephemeral batches. + // Real batches are capable of accumulating updates from commands and + // applying them to the state machine. Ephemeral batches are not able + // to make changes to the durable state machine, but can still be used + // for the purpose of checking commands to determine whether they will + // be rejected or not when staged in a real batch. The principal user + // of ephemeral batches is AckCommittedEntriesBeforeApplication. + NewBatch(ephemeral bool) Batch // ApplySideEffects applies the in-memory side-effects of a Command to // the replicated state machine. The method will be called in the order // that the commands are committed to the state machine's log. Once the @@ -117,6 +125,106 @@ func (t *Task) assertDecoded() { } } +// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of +// raft entries that have been durably committed to the raft log but have not +// yet been applied to the proposer replica's replicated state machine. +// +// This is safe because a proposal through raft can be known to have succeeded +// as soon as it is durably replicated to a quorum of replicas (i.e. has +// committed in the raft log). The proposal does not need to wait for the +// effects of the proposal to be applied in order to know whether its changes +// will succeed or fail. This is because the raft log is the provider of +// atomicity and durability for replicated writes, not (ignoring log +// truncation) the replicated state machine itself. +// +// However, there are a few complications to acknowledging the success of a +// proposal at this stage: +// +// 1. Committing an entry in the raft log and having the command in that entry +// succeed are similar but not equivalent concepts. Even if the entry succeeds +// in achieving durability by replicating to a quorum of replicas, its command +// may still be rejected "beneath raft". This means that a (deterministic) +// check after replication decides that the command will not be applied to the +// replicated state machine. In that case, the client waiting on the result of +// the command should not be informed of its success. Luckily, this check is +// cheap to perform so we can do it here and when applying the command. +// +// Determining whether the command will succeed or be rejected before applying +// it for real is accomplished using an ephemeral batch. Commands are staged in +// the ephemeral batch to acquire CheckedCommands, which can then be acknowledged +// immediately even though the ephemeral batch itself cannot be used to update +// the durable state machine. Once the rejection status of each command is +// determined, any successful commands that permit acknowledgement before +// application (see CanAckBeforeApplication) are acknowledged. The ephemeral +// batch is then thrown away. +// +// 2. Some commands perform non-trivial work such as updating Replica configuration +// state or performing Range splits. In those cases, it's likely that the client +// is interested in not only knowing whether it has succeeded in sequencing the +// change in the raft log, but also in knowing when the change has gone into +// effect. There's currently no exposed hook to ask for an acknowledgement only +// after a command has been applied, so for simplicity the current implementation +// only ever acks transactional writes before they have gone into effect. All +// other commands wait until they have been applied to ack their client. +// +// 3. Even though we can determine whether a command has succeeded without applying +// it, the effect of the command will not be visible to conflicting commands until +// it is applied. Because of this, the client can be informed of the success of +// a write at this point, but we cannot release that write's latches until the +// write has applied. See ProposalData.signalProposalResult/finishApplication. +// +// 4. etcd/raft may provided a series of CommittedEntries in a Ready struct that +// haven't actually been appended to our own log. This is most common in single +// node replication groups, but it is possible when a follower in a multi-node +// replication group is catching up after falling behind. In the first case, +// the entries are not yet committed so acknowledging them would be a lie. In +// the second case, the entries are committed so we could acknowledge them at +// this point, but doing so seems risky. To avoid complications in either case, +// the method takes a maxIndex parameter that limits the indexes that it will +// acknowledge. Typically, callers will supply the highest index that they have +// durably written to their raft log for this upper bound. +// +func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error { + t.assertDecoded() + if !t.anyLocal { + return nil // fast-path + } + + // Create a new ephemeral application batch. All we're interested in is + // whether commands will be rejected or not when staged in a real batch. + batch := t.sm.NewBatch(true /* ephemeral */) + defer batch.Close() + + iter := t.dec.NewCommandIter() + defer iter.Close() + + // Collect a batch of trivial commands from the applier. Stop at the first + // non-trivial command or at the first command with an index above maxIndex. + batchIter := takeWhileCmdIter(iter, func(cmd Command) bool { + if cmd.Index() > maxIndex { + return false + } + return cmd.IsTrivial() + }) + + // Stage the commands in the (ephemeral) batch. + stagedIter, err := mapCmdIter(batchIter, batch.Stage) + if err != nil { + return err + } + + // Acknowledge any locally-proposed commands that succeeded in being staged + // in the batch and can be acknowledged before they are actually applied. + // Don't acknowledge rejected proposals early because the StateMachine may + // want to retry the command instead of returning the error to the client. + return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error { + if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() { + return cmd.AckSuccess() + } + return nil + }) +} + // SetMaxBatchSize sets the maximum application batch size. If 0, no limit // will be placed on the number of commands that can be applied in a batch. func (t *Task) SetMaxBatchSize(size int) { @@ -144,7 +252,7 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { // b) exactly one non-trivial command func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error { // Create a new application batch. - batch := t.sm.NewBatch() + batch := t.sm.NewBatch(false /* ephemeral */) defer batch.Close() // Consume a batch-worth of commands. diff --git a/pkg/storage/apply/task_test.go b/pkg/storage/apply/task_test.go index 322b058e7233..d68970f2963f 100644 --- a/pkg/storage/apply/task_test.go +++ b/pkg/storage/apply/task_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/raftpb" ) @@ -50,15 +51,29 @@ type appliedCmd struct { *checkedCmd } -func (c *cmd) Index() uint64 { return c.index } -func (c *cmd) IsTrivial() bool { return !c.nonTrivial } -func (c *cmd) IsLocal() bool { return !c.nonLocal } -func (c *checkedCmd) Rejected() bool { return c.rejected } -func (c *appliedCmd) FinishAndAckOutcome() error { - c.finished = true +func (c *cmd) Index() uint64 { return c.index } +func (c *cmd) IsTrivial() bool { return !c.nonTrivial } +func (c *cmd) IsLocal() bool { return !c.nonLocal } +func (c *checkedCmd) Rejected() bool { return c.rejected } +func (c *checkedCmd) CanAckBeforeApplication() bool { return true } +func (c *checkedCmd) AckSuccess() error { c.acked = true if logging { - fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + fmt.Printf(" acknowledging command %d before application\n", c.Index()) + } + return nil +} +func (c *appliedCmd) FinishAndAckOutcome() error { + c.finished = true + if c.acked { + if logging { + fmt.Printf(" finishing command %d; rejected=%t\n", c.Index(), c.Rejected()) + } + } else { + if logging { + fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + } + c.acked = true } return nil } @@ -106,12 +121,12 @@ func getTestStateMachine() *testStateMachine { return new(testStateMachine) } -func (sm *testStateMachine) NewBatch() apply.Batch { +func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch { if sm.batchOpen { panic("batch not closed") } sm.batchOpen = true - return &testBatch{sm: sm} + return &testBatch{sm: sm, ephemeral: ephemeral} } func (sm *testStateMachine) ApplySideEffects( cmdI apply.CheckedCommand, @@ -126,8 +141,9 @@ func (sm *testStateMachine) ApplySideEffects( } type testBatch struct { - sm *testStateMachine - staged []uint64 + sm *testStateMachine + ephemeral bool + staged []uint64 } func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { @@ -137,6 +153,9 @@ func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { return &ccmd, nil } func (b *testBatch) ApplyToStateMachine(_ context.Context) error { + if b.ephemeral { + return errors.New("can't commit an ephemeral batch") + } b.sm.batches = append(b.sm.batches, b.staged) b.sm.applied = append(b.sm.applied, b.staged...) if logging { @@ -255,3 +274,76 @@ func TestApplyCommittedEntriesWithBatchSize(t *testing.T) { require.True(t, cmd.finished) } } + +func TestAckCommittedEntriesBeforeApplication(t *testing.T) { + ctx := context.Background() + ents := makeEntries(9) + + sm := getTestStateMachine() + dec := newTestDecoder() + dec.nonTrivial[6] = true + dec.nonTrivial[7] = true + dec.nonTrivial[9] = true + dec.nonLocal[2] = true + dec.shouldReject[3] = true + + // Use an apply.Task to ack all commands before applying them. + appT := apply.MakeTask(sm, dec) + defer appT.Close() + require.NoError(t, appT.Decode(ctx, ents)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */)) + + // Assert that the state machine was not updated. + require.Equal(t, testStateMachine{}, *sm) + + // Assert that some commands were acknowledged early and that none were finished. + for _, cmd := range dec.cmds { + var exp bool + switch cmd.index { + case 1, 4, 5: + exp = true // local and successful + case 2: + exp = false // remote + case 3: + exp = false // local and rejected + default: + exp = false // after first non-trivial cmd + } + require.Equal(t, exp, cmd.acked) + require.False(t, cmd.finished) + } + + // Try again with a lower maximum log index. + appT.Close() + ents = makeEntries(5) + + dec = newTestDecoder() + dec.nonLocal[2] = true + dec.shouldReject[3] = true + + appT = apply.MakeTask(sm, dec) + require.NoError(t, appT.Decode(ctx, ents)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */)) + + // Assert that the state machine was not updated. + require.Equal(t, testStateMachine{}, *sm) + + // Assert that some commands were acknowledged early and that none were finished. + for _, cmd := range dec.cmds { + var exp bool + switch cmd.index { + case 1, 4: + exp = true // local and successful + case 2: + exp = false // remote + case 3: + exp = false // local and rejected + case 5: + exp = false // index too high + default: + t.Fatalf("unexpected index %d", cmd.index) + } + require.Equal(t, exp, cmd.acked) + require.False(t, cmd.finished) + } +} diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 15f487ca7fbb..be1867910313 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -4554,3 +4554,107 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing require.Nil(t, pErr, "expected to succeed after healing the partition") mtc.waitForValues(keyA, []int64{3, 3, 3}) } + +// TestAckWriteBeforeApplication tests that the success of transactional writes +// is acknowledged after those writes have been committed to a Range's Raft log +// but before those writes have been applied to its replicated state machine. +func TestAckWriteBeforeApplication(t *testing.T) { + defer leaktest.AfterTest(t)() + for _, tc := range []struct { + repls int + expAckBeforeAppl bool + }{ + // In a single-replica Range, each handleRaftReady iteration will append + // new entries to the Raft log and immediately apply them. This prevents + // "early acknowledgement" from being possible or useful. See the comment + // on apply.Task.AckCommittedEntriesBeforeApplication. + {1, false}, + // In a three-replica Range, each handleRaftReady iteration will append + // a set of entries to the Raft log and then apply the previous set of + // entries. This makes "early acknowledgement" a major optimization, as + // it pulls the entire latency required to append the next set of entries + // to the Raft log out of the client-perceived latency of the previous + // set of entries. + {3, true}, + } { + t.Run(fmt.Sprintf("numRepls=%d", tc.repls), func(t *testing.T) { + var filterActive int32 + var magicTS hlc.Timestamp + blockPreApplication, blockPostApplication := make(chan struct{}), make(chan struct{}) + applyFilterFn := func(ch chan struct{}) storagebase.ReplicaApplyFilter { + return func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) { + if atomic.LoadInt32(&filterActive) == 1 && filterArgs.Timestamp == magicTS { + <-ch + } + return 0, nil + } + } + + tsc := storage.TestStoreConfig(nil) + tsc.TestingKnobs.TestingApplyFilter = applyFilterFn(blockPreApplication) + tsc.TestingKnobs.TestingPostApplyFilter = applyFilterFn(blockPostApplication) + + mtc := &multiTestContext{storeConfig: &tsc} + defer mtc.Stop() + mtc.Start(t, tc.repls) + + // Replicate the Range, if necessary. + key := roachpb.Key("a") + rangeID := mtc.stores[0].LookupReplica(roachpb.RKey(key)).RangeID + for i := 1; i < tc.repls; i++ { + mtc.replicateRange(rangeID, i) + } + + // Begin peforming a write on the Range. + magicTS = mtc.stores[0].Clock().Now() + atomic.StoreInt32(&filterActive, 1) + ch := make(chan *roachpb.Error, 1) + go func() { + ctx := context.Background() + put := putArgs(key, []byte("val")) + _, pErr := client.SendWrappedWith(ctx, mtc.stores[0].TestSender(), roachpb.Header{ + Timestamp: magicTS, + }, put) + ch <- pErr + }() + + expResult := func() { + t.Helper() + if pErr := <-ch; pErr != nil { + t.Fatalf("unexpected proposal result error: %v", pErr) + } + } + dontExpResult := func() { + t.Helper() + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-ch: + t.Fatalf("unexpected proposal acknowledged before TestingApplyFilter: %v", pErr) + } + } + + // The result should be blocked on the pre-apply filter. + dontExpResult() + + // Release the pre-apply filter. + close(blockPreApplication) + // Depending on the cluster configuration, The result may not be blocked + // on the post-apply filter because it may be able to acknowledges the + // client before applying. + if tc.expAckBeforeAppl { + expResult() + } else { + dontExpResult() + } + + // Stop blocking Raft application to allow everything to shut down cleanly. + // This also confirms that the proposal does eventually apply. + close(blockPostApplication) + // If we didn't expect an acknowledgement before, we do now. + if !tc.expAckBeforeAppl { + expResult() + } + }) + } +} diff --git a/pkg/storage/replica_application_cmd.go b/pkg/storage/replica_application_cmd.go index 5524bcc0c47a..7712eb826917 100644 --- a/pkg/storage/replica_application_cmd.go +++ b/pkg/storage/replica_application_cmd.go @@ -116,6 +116,39 @@ func (c *replicatedCmd) Rejected() bool { return c.forcedErr != nil } +// CanAckBeforeApplication implements the apply.CheckedCommand interface. +func (c *replicatedCmd) CanAckBeforeApplication() bool { + // CanAckBeforeApplication determines whether the request type is compatible + // with acknowledgement of success before it has been applied. For now, this + // determination is based on whether the request is performing transactional + // writes. This could be exposed as an option on the batch header instead. + // + // We don't try to ack async consensus writes before application because we + // know that there isn't a client waiting for the result. + req := c.proposal.Request + return req.IsTransactionWrite() && !req.AsyncConsensus +} + +// AckSuccess implements the apply.CheckedCommand interface. +func (c *replicatedCmd) AckSuccess() error { + if !c.IsLocal() { + return nil + } + + // Signal the proposal's response channel with the result. + // Make a copy of the response to avoid data races between client mutations + // of the response and use of the response in endCmds.done when the command + // is finished. + var resp proposalResult + reply := *c.proposal.Local.Reply + reply.Responses = append([]roachpb.ResponseUnion(nil), reply.Responses...) + resp.Reply = &reply + resp.Intents = c.proposal.Local.DetachIntents() + resp.EndTxns = c.proposal.Local.DetachEndTxns(false /* alwaysOnly */) + c.proposal.signalProposalResult(resp) + return nil +} + // FinishAndAckOutcome implements the apply.AppliedCommand interface. func (c *replicatedCmd) FinishAndAckOutcome() error { tracing.FinishSpan(c.sp) diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index c9664188f3bf..02f3ffb889d2 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -105,8 +105,10 @@ func (e *nonDeterministicFailure) Unwrap() error { return e.wrapped } // side-effects of each command is applied to the Replica's in-memory state. type replicaStateMachine struct { r *Replica - // batch is returned from NewBatch(). + // batch is returned from NewBatch(false /* ephemeral */). batch replicaAppBatch + // ephemeralBatch is returned from NewBatch(true /* ephemeral */). + ephemeralBatch ephemeralReplicaAppBatch // stats are updated during command application and reset by moveStats. stats applyCommittedEntriesStats } @@ -321,8 +323,16 @@ func checkForcedErr( } // NewBatch implements the apply.StateMachine interface. -func (sm *replicaStateMachine) NewBatch() apply.Batch { +func (sm *replicaStateMachine) NewBatch(ephemeral bool) apply.Batch { r := sm.r + if ephemeral { + mb := &sm.ephemeralBatch + mb.r = r + r.mu.RLock() + mb.state = r.mu.state + r.mu.RUnlock() + return mb + } b := &sm.batch b.r = r b.sm = sm @@ -775,6 +785,35 @@ func (b *replicaAppBatch) Close() { *b = replicaAppBatch{} } +// ephemeralReplicaAppBatch implements the apply.Batch interface. +// +// The batch performs the bare-minimum amount of work to be able to +// determine whether a replicated command should be rejected or applied. +type ephemeralReplicaAppBatch struct { + r *Replica + state storagepb.ReplicaState +} + +// Stage implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { + cmd := cmdI.(*replicatedCmd) + ctx := cmd.ctx + + mb.r.shouldApplyCommand(ctx, cmd, &mb.state) + mb.state.LeaseAppliedIndex = cmd.leaseIndex + return cmd, nil +} + +// ApplyToStateMachine implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) ApplyToStateMachine(ctx context.Context) error { + panic("cannot apply ephemeralReplicaAppBatch to state machine") +} + +// Close implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Close() { + *mb = ephemeralReplicaAppBatch{} +} + // ApplySideEffects implements the apply.StateMachine interface. The method // handles the third phase of applying a command to the replica state machine. // diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 62b3852e586f..35ab1b7b4364 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -546,6 +546,40 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + // If the ready struct includes entries that have been committed, these + // entries will be applied to the Replica's replicated state machine down + // below, after appending new entries to the raft log and sending messages + // to peers. However, the process of appending new entries to the raft log + // and then applying committed entries to the state machine can take some + // time - and these entries are already durably committed. If they have + // clients waiting on them, we'd like to acknowledge their success as soon + // as possible. To facilitate this, we take a quick pass over the committed + // entries and acknowledge as many as we can trivially prove will not be + // rejected beneath raft. + // + // Note that we only acknowledge up to the current last index in the Raft + // log. The CommittedEntries slice may contain entries that are also in the + // Entries slice (to be appended in this ready pass), and we don't want to + // acknowledge them until they are durably in our local Raft log. This is + // most common in single node replication groups, but it is possible when a + // follower in a multi-node replication group is catching up after falling + // behind. In the first case, the entries are not yet committed so + // acknowledging them would be a lie. In the second case, the entries are + // committed so we could acknowledge them at this point, but doing so seems + // risky. To avoid complications in either case, we pass lastIndex for the + // maxIndex argument to AckCommittedEntriesBeforeApplication. + sm := r.getStateMachine() + dec := r.getDecoder() + appTask := apply.MakeTask(sm, dec) + appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) + defer appTask.Close() + if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { + return stats, err.(*nonDeterministicFailure).safeExpl, err + } + if err := appTask.AckCommittedEntriesBeforeApplication(ctx, lastIndex); err != nil { + return stats, err.(*nonDeterministicFailure).safeExpl, err + } + // Separate the MsgApp messages from all other Raft message types so that we // can take advantage of the optimization discussed in the Raft thesis under // the section: `10.2.1 Writing to the leader’s disk in parallel`. The @@ -721,14 +755,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - sm := r.getStateMachine() - dec := r.getDecoder() - appTask := apply.MakeTask(sm, dec) - appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) - defer appTask.Close() - if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { - return stats, err.(*nonDeterministicFailure).safeExpl, err - } if err := appTask.ApplyCommittedEntries(ctx); err != nil { return stats, err.(*nonDeterministicFailure).safeExpl, err } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 6063ffa66aaf..8278ee487188 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2487,12 +2487,10 @@ func TestStoreScanMultipleIntents(t *testing.T) { // in a single batch. manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) - // Query the range with a single INCONSISTENT scan, which should - // cause all intents to be resolved. + // Query the range with a single scan, which should cause all intents + // to be resolved. sArgs := scanArgs(key1, key10.Next()) - if _, pErr := client.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ - ReadConsistency: roachpb.INCONSISTENT, - }, &sArgs); pErr != nil { + if _, pErr := client.SendWrapped(context.Background(), store.TestSender(), &sArgs); pErr != nil { t.Fatal(pErr) }