From 5937762be92907b461a72286e4830b785ba521a1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 1 Aug 2019 21:31:31 -0400 Subject: [PATCH] [DNM] storage/apply: support applying committed raft entries before application This will be pulled out into #38954 instead. Release note: None --- pkg/storage/apply/cmd.go | 7 ++ pkg/storage/apply/doc.go | 6 +- pkg/storage/apply/doc_test.go | 43 ++++++- pkg/storage/apply/task.go | 112 +++++++++++++++++- pkg/storage/apply/task_test.go | 98 +++++++++++++-- pkg/storage/replica_application_cmd.go | 38 ++++++ .../replica_application_state_machine.go | 43 ++++++- pkg/storage/replica_raft.go | 38 ++++-- 8 files changed, 361 insertions(+), 24 deletions(-) diff --git a/pkg/storage/apply/cmd.go b/pkg/storage/apply/cmd.go index de2921e4430e..8ad0dab00539 100644 --- a/pkg/storage/apply/cmd.go +++ b/pkg/storage/apply/cmd.go @@ -31,6 +31,13 @@ type CheckedCommand interface { Command // Rejected returns whether the command was rejected. Rejected() bool + // CanAckBeforeApplication returns whether the success of the command + // can be acknowledged before the command has been applied to the state + // machine. + CanAckBeforeApplication() bool + // AckSuccess acknowledges the success of the command to its client. + // Must only be called if !Rejected. + AckSuccess() error } // AppliedCommand is a command that has been applied to the replicated state diff --git a/pkg/storage/apply/doc.go b/pkg/storage/apply/doc.go index 2e9862daa20b..c9e17c8bd0d7 100644 --- a/pkg/storage/apply/doc.go +++ b/pkg/storage/apply/doc.go @@ -89,7 +89,11 @@ evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the completion of that change, client responses are determined before replication begins. The only remaining work to be done after replication of a command succeeds is to determine whether it will be rejected and replaced by an empty -command. +command. To facilitate this acknowledgement as early as possible, this package +provides the ability to acknowledge a series of commands before applying them to +the state machine. Outcomes are determined before performing any durable work by +stepping commands through an in-memory "ephemeral" copy of the state machine. +For more, see Task.AckCommittedEntriesBeforeApplication. A final challenge comes from the desire to properly prioritize the application of commands across multiple state machines in systems like CockroachDB where diff --git a/pkg/storage/apply/doc_test.go b/pkg/storage/apply/doc_test.go index 4cac97a4cad1..5cd76018a7ee 100644 --- a/pkg/storage/apply/doc_test.go +++ b/pkg/storage/apply/doc_test.go @@ -29,22 +29,46 @@ func ExampleTask() { dec.nonLocal[6] = true dec.shouldReject[3] = true dec.shouldReject[6] = true + fmt.Print(` +Setting up a batch of seven log entries: + - index 2 and 6 are non-local + - index 3 and 6 will be rejected + - index 5 is not trivial +`) t := apply.MakeTask(sm, dec) defer t.Close() - fmt.Println("Decode:") + fmt.Println("\nDecode (note that index 2 and 6 are not local):") if err := t.Decode(ctx, ents); err != nil { panic(err) } + fmt.Println("\nAckCommittedEntriesBeforeApplication:") + if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil { + panic(err) + } + fmt.Print(` +Above, only index 1 and 4 get acked early. The command at 5 is +non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry +must be in the first batch to qualify for acking early. 2 is not local +(so there's nobody to ack), and 3 is rejected. We can't ack rejected +commands early because the state machine is free to handle them any way +it likes. +`) + fmt.Println("\nApplyCommittedEntries:") if err := t.ApplyCommittedEntries(ctx); err != nil { panic(err) } // Output: // - // Decode: + // Setting up a batch of seven log entries: + // - index 2 and 6 are non-local + // - index 3 and 6 will be rejected + // - index 5 is not trivial + // + // Decode (note that index 2 and 6 are not local): // decoding command 1; local=true // decoding command 2; local=false // decoding command 3; local=true @@ -53,16 +77,27 @@ func ExampleTask() { // decoding command 6; local=false // decoding command 7; local=true // + // AckCommittedEntriesBeforeApplication: + // acknowledging command 1 before application + // acknowledging command 4 before application + // + // Above, only index 1 and 4 get acked early. The command at 5 is + // non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry + // must be in the first batch to qualify for acking early. 2 is not local + // (so there's nobody to ack), and 3 is rejected. We can't ack rejected + // commands early because the state machine is free to handle them any way + // it likes. + // // ApplyCommittedEntries: // committing batch with commands=[1 2 3 4] // applying side-effects of command 1 // applying side-effects of command 2 // applying side-effects of command 3 // applying side-effects of command 4 - // finishing and acknowledging command 1; rejected=false + // finishing command 1; rejected=false // finishing and acknowledging command 2; rejected=false // finishing and acknowledging command 3; rejected=true - // finishing and acknowledging command 4; rejected=false + // finishing command 4; rejected=false // committing batch with commands=[5] // applying side-effects of command 5 // finishing and acknowledging command 5; rejected=false diff --git a/pkg/storage/apply/task.go b/pkg/storage/apply/task.go index 50db9628b8ef..bcd8625b41b1 100644 --- a/pkg/storage/apply/task.go +++ b/pkg/storage/apply/task.go @@ -29,7 +29,15 @@ type StateMachine interface { // effects that a group of Commands will have on the replicated state // machine. Commands are staged in the batch one-by-one and then the // entire batch is committed at once. - NewBatch() Batch + // + // Batch comes in two flavors - real batches and ephemeral batches. + // Real batches are capable of accumulating updates from commands and + // applying them to the state machine. Ephemeral batches are not able + // to make changes to the durable state machine, but can still be used + // for the purpose of checking commands to determine whether they will + // be rejected or not when staged in a real batch. The principal user + // of ephemeral batches is AckCommittedEntriesBeforeApplication. + NewBatch(ephemeral bool) Batch // ApplySideEffects applies the in-memory side-effects of a Command to // the replicated state machine. The method will be called in the order // that the commands are committed to the state machine's log. It will @@ -107,6 +115,106 @@ func (t *Task) assertDecoded() { } } +// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of +// raft entries that have been durably committed to the raft log but have not +// yet been applied to the proposer replica's replicated state machine. +// +// This is safe because a proposal through raft can be known to have succeeded +// as soon as it is durably replicated to a quorum of replicas (i.e. has +// committed in the raft log). The proposal does not need to wait for the +// effects of the proposal to be applied in order to know whether its changes +// will succeed or fail. This is because the raft log is the provider of +// atomicity and durability for replicated writes, not (ignoring log +// truncation) the replicated state machine itself. +// +// However, there are a few complications to acknowledging the success of a +// proposal at this stage: +// +// 1. Committing an entry in the raft log and having the command in that entry +// succeed are similar but not equivalent concepts. Even if the entry succeeds +// in achieving durability by replicating to a quorum of replicas, its command +// may still be rejected "beneath raft". This means that a (deterministic) +// check after replication decides that the command will not be applied to the +// replicated state machine. In that case, the client waiting on the result of +// the command should not be informed of its success. Luckily, this check is +// cheap to perform so we can do it here and when applying the command. +// +// Determining whether the command will succeed or be rejected before applying +// it for real is accomplished using an ephemeral batch. Commands are staged in +// the ephemeral batch to acquire CheckedCommands, which can then be acknowledged +// immediately even though the ephemeral batch itself cannot be used to update +// the durable state machine. Once the rejection status of each command is +// determined, any successful commands that permit acknowledgement before +// application (see CanAckBeforeApplication) are acknowledged. The ephemeral +// batch is then thrown away. +// +// 2. Some commands perform non-trivial work such as updating Replica configuration +// state or performing Range splits. In those cases, it's likely that the client +// is interested in not only knowing whether it has succeeded in sequencing the +// change in the raft log, but also in knowing when the change has gone into +// effect. There's currently no exposed hook to ask for an acknowledgement only +// after a command has been applied, so for simplicity the current implementation +// only ever acks transactional writes before they have gone into effect. All +// other commands wait until they have been applied to ack their client. +// +// 3. Even though we can determine whether a command has succeeded without applying +// it, the effect of the command will not be visible to conflicting commands until +// it is applied. Because of this, the client can be informed of the success of +// a write at this point, but we cannot release that write's latches until the +// write has applied. See ProposalData.signalProposalResult/finishApplication. +// +// 4. etcd/raft may provided a series of CommittedEntries in a Ready struct that +// haven't actually been appended to our own log. This is most common in single +// node replication groups, but it is possible when a follower in a multi-node +// replication group is catching up after falling behind. In the first case, +// the entries are not yet committed so acknowledging them would be a lie. In +// the second case, the entries are committed so we could acknowledge them at +// this point, but doing so seems risky. To avoid complications in either case, +// the method takes a maxIndex parameter that limits the indexes that it will +// acknowledge. Typically, callers will supply the highest index that they have +// durably written to their raft log for this upper bound. +// +func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error { + t.assertDecoded() + if !t.anyLocal { + return nil // fast-path + } + + // Create a new ephemeral application batch. All we're interested in is + // whether commands will be rejected or not when staged in a real batch. + batch := t.sm.NewBatch(true /* ephemeral */) + defer batch.Close() + + iter := t.dec.NewCommandIter() + defer iter.Close() + + // Collect a batch of trivial commands from the applier. Stop at the first + // non-trivial command or at the first command with an index above maxIndex. + batchIter := takeWhileCmdIter(iter, func(cmd Command) bool { + if cmd.Index() > maxIndex { + return false + } + return cmd.IsTrivial() + }) + + // Stage the commands in the (moephemeralck) batch. + stagedIter, err := mapCmdIter(batchIter, batch.Stage) + if err != nil { + return err + } + + // Acknowledge any locally-proposed commands that succeeded in being staged + // in the batch and can be acknowledged before they are actually applied. + // Don't acknowledge rejected proposals early because the StateMachine may + // want to retry the command instead of returning the error to the client. + return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error { + if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() { + return cmd.AckSuccess() + } + return nil + }) +} + // SetMaxBatchSize sets the maximum application batch size. If 0, no limit // will be placed on the number of commands that can be applied in a batch. func (t *Task) SetMaxBatchSize(size int) { @@ -134,7 +242,7 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { // b) exactly one non-trivial command func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error { // Create a new application batch. - batch := t.sm.NewBatch() + batch := t.sm.NewBatch(false /* ephemeral */) defer batch.Close() // Consume a batch-worth of commands. diff --git a/pkg/storage/apply/task_test.go b/pkg/storage/apply/task_test.go index 1903560f322c..166a7434704f 100644 --- a/pkg/storage/apply/task_test.go +++ b/pkg/storage/apply/task_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/raftpb" ) @@ -64,9 +65,15 @@ func (c *checkedCmd) AckSuccess() error { } func (c *appliedCmd) FinishAndAckOutcome() error { c.finished = true - c.acked = true - if logging { - fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + if c.acked { + if logging { + fmt.Printf(" finishing command %d; rejected=%t\n", c.Index(), c.Rejected()) + } + } else { + if logging { + fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + } + c.acked = true } return nil } @@ -114,12 +121,12 @@ func getTestStateMachine() *testStateMachine { return new(testStateMachine) } -func (sm *testStateMachine) NewBatch() apply.Batch { +func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch { if sm.batchOpen { panic("batch not closed") } sm.batchOpen = true - return &testBatch{sm: sm} + return &testBatch{sm: sm, ephemeral: ephemeral} } func (sm *testStateMachine) ApplySideEffects( cmdI apply.CheckedCommand, @@ -134,8 +141,9 @@ func (sm *testStateMachine) ApplySideEffects( } type testBatch struct { - sm *testStateMachine - staged []uint64 + sm *testStateMachine + ephemeral bool + staged []uint64 } func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { @@ -145,6 +153,9 @@ func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { return &ccmd, nil } func (b *testBatch) Commit(_ context.Context) error { + if b.ephemeral { + return errors.New("can't commit an ephemeral batch") + } b.sm.batches = append(b.sm.batches, b.staged) b.sm.applied = append(b.sm.applied, b.staged...) if logging { @@ -263,3 +274,76 @@ func TestApplyCommittedEntriesWithBatchSize(t *testing.T) { require.True(t, cmd.finished) } } + +func TestAckCommittedEntriesBeforeApplication(t *testing.T) { + ctx := context.Background() + ents := makeEntries(9) + + sm := getTestStateMachine() + dec := newTestDecoder() + dec.nonTrivial[6] = true + dec.nonTrivial[7] = true + dec.nonTrivial[9] = true + dec.nonLocal[2] = true + dec.shouldReject[3] = true + + // Use an apply.Task to ack all commands before applying them. + appT := apply.MakeTask(sm, dec) + defer appT.Close() + require.NoError(t, appT.Decode(ctx, ents)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */)) + + // Assert that the state machine was not updated. + require.Equal(t, testStateMachine{}, *sm) + + // Assert that some commands were acknowledged early and that none were finished. + for _, cmd := range dec.cmds { + var exp bool + switch cmd.index { + case 1, 4, 5: + exp = true // local and successful + case 2: + exp = false // remote + case 3: + exp = false // local and rejected + default: + exp = false // after first non-trivial cmd + } + require.Equal(t, exp, cmd.acked) + require.False(t, cmd.finished) + } + + // Try again with a lower maximum log index. + appT.Close() + ents = makeEntries(5) + + dec = newTestDecoder() + dec.nonLocal[2] = true + dec.shouldReject[3] = true + + appT = apply.MakeTask(sm, dec) + require.NoError(t, appT.Decode(ctx, ents)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */)) + + // Assert that the state machine was not updated. + require.Equal(t, testStateMachine{}, *sm) + + // Assert that some commands were acknowledged early and that none were finished. + for _, cmd := range dec.cmds { + var exp bool + switch cmd.index { + case 1, 4: + exp = true // local and successful + case 2: + exp = false // remote + case 3: + exp = false // local and rejected + case 5: + exp = false // index too high + default: + t.Fatalf("unexpected index %d", cmd.index) + } + require.Equal(t, exp, cmd.acked) + require.False(t, cmd.finished) + } +} diff --git a/pkg/storage/replica_application_cmd.go b/pkg/storage/replica_application_cmd.go index a23162d599f9..778d42296e5b 100644 --- a/pkg/storage/replica_application_cmd.go +++ b/pkg/storage/replica_application_cmd.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "go.etcd.io/etcd/raft/raftpb" ) @@ -110,6 +111,43 @@ func (c *replicatedCmd) Rejected() bool { return c.forcedErr != nil } +// CanAckBeforeApplication implements the apply.CheckedCommand interface. +func (c *replicatedCmd) CanAckBeforeApplication() bool { + // CanAckBeforeApplication determines whether the request type is compatable + // with acknowledgement of success before it has been applied. For now, this + // determination is based on whether the request is performing transactional + // writes. This could be exposed as an option on the batch header instead. + // + // We don't try to ack async consensus writes before application because we + // know that there isn't a client waiting for the result. + req := c.proposal.Request + return req.IsTransactionWrite() && !req.AsyncConsensus +} + +// AckSuccess implements the apply.CheckedCommand interface. +func (c *replicatedCmd) AckSuccess() error { + if !c.IsLocal() { + return nil + } + // If the local command isn't already managing its own tracing span, fork + // the request's context so that it can outlive the proposer's context. + if c.proposal.sp == nil { + c.proposal.ctx, c.proposal.sp = tracing.ForkCtxSpan(c.ctx, "command application") + c.ctx = c.proposal.ctx + } + + // Signal the proposal's response channel with the result. + // Make a copy of the response to avoid data races. + var resp proposalResult + reply := *c.proposal.Local.Reply + reply.Responses = append([]roachpb.ResponseUnion(nil), reply.Responses...) + resp.Reply = &reply + resp.Intents = c.proposal.Local.DetachIntents() + resp.EndTxns = c.proposal.Local.DetachEndTxns(false) + c.proposal.signalProposalResult(resp) + return nil +} + // FinishAndAckOutcome implements the apply.AppliedCommand interface. func (c *replicatedCmd) FinishAndAckOutcome() error { if !c.IsLocal() { diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index 79f846edc720..71898e692db8 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -105,8 +105,10 @@ func (e *nonDeterministicFailure) Unwrap() error { return e.wrapped } // side-effects of each command is applied to the Replica's in-memory state. type replicaStateMachine struct { r *Replica - // batch is returned from NewBatch(). + // batch is returned from NewBatch(false /* ephemeral */). batch replicaAppBatch + // ephemeralBatch is returned from NewBatch(true /* ephemeral */). + ephemeralBatch ephemeralReplicaAppBatch // stats are updated during command application and reset by moveStats. stats applyCommittedEntriesStats } @@ -321,8 +323,16 @@ func checkForcedErr( } // NewBatch implements the apply.StateMachine interface. -func (sm *replicaStateMachine) NewBatch() apply.Batch { +func (sm *replicaStateMachine) NewBatch(ephemeral bool) apply.Batch { r := sm.r + if ephemeral { + mb := &sm.ephemeralBatch + mb.r = r + r.mu.RLock() + mb.state = r.mu.state + r.mu.RUnlock() + return mb + } b := &sm.batch b.r = r b.sm = sm @@ -772,6 +782,35 @@ func (b *replicaAppBatch) Close() { *b = replicaAppBatch{} } +// ephemeralReplicaAppBatch implements the apply.Batch interface. +// +// The batch performs the bare-minimum amount of work to be able to +// determine whether a replicated command should be rejected or applied. +type ephemeralReplicaAppBatch struct { + r *Replica + state storagepb.ReplicaState +} + +// Stage implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { + cmd := cmdI.(*replicatedCmd) + ctx := cmd.ctx + + mb.r.shouldApplyCommand(ctx, cmd, &mb.state) + mb.state.LeaseAppliedIndex = cmd.leaseIndex + return cmd, nil +} + +// Commit implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Commit(ctx context.Context) error { + panic("cannot commit ephemeralReplicaAppBatch") +} + +// Close implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Close() { + *mb = ephemeralReplicaAppBatch{} +} + // ApplySideEffects implements the apply.StateMachine interface. The method // handles the third phase of applying a command to the replica state machine. // diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 7b28cbd36fd1..755110f6d4f7 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -543,6 +543,36 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + // If the ready struct includes entries that have been committed, these + // entries will be applied to the Replica's replicated state machine down + // below, after appending new entries to the raft log and sending messages + // to peers. However, the process of appending new entries to the raft log + // and then applying committed entries to the state machine can take some + // time - and these entries are already durably committed. If they have + // clients waiting on them, we'd like to acknowledge their success as soon + // as possible. To facilitate this, we take a quick pass over the committed + // entries and acknowledge as many as we can trivially prove will not be + // rejected beneath raft. + // + // We only try to ack committed entries before applying them if we have + // clients waiting on the result of Raft entries. If not, we know none of + // the committed entries were proposed locally. + // + // We share the entry generator and command application batch with the call + // to handleCommittedEntriesRaftMuLocked down below to avoid duplicating any + // decoding work. + sm := r.getStateMachine() + dec := r.getDecoder() + appTask := apply.MakeTask(sm, dec) + appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) + defer appTask.Close() + if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { + return stats, err.(*nonDeterministicFailure).safeExpl, err + } + if err := appTask.AckCommittedEntriesBeforeApplication(ctx, lastIndex); err != nil { + return stats, err.(*nonDeterministicFailure).safeExpl, err + } + // Separate the MsgApp messages from all other Raft message types so that we // can take advantage of the optimization discussed in the Raft thesis under // the section: `10.2.1 Writing to the leader’s disk in parallel`. The @@ -718,14 +748,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - sm := r.getStateMachine() - dec := r.getDecoder() - appTask := apply.MakeTask(sm, dec) - appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) - defer appTask.Close() - if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { - return stats, err.(*nonDeterministicFailure).safeExpl, err - } if err := appTask.ApplyCommittedEntries(ctx); err != nil { return stats, err.(*nonDeterministicFailure).safeExpl, err }