diff --git a/pkg/storage/apply/cmd.go b/pkg/storage/apply/cmd.go index 668a9ebe2c56..7ac74b17e2fb 100644 --- a/pkg/storage/apply/cmd.go +++ b/pkg/storage/apply/cmd.go @@ -33,6 +33,13 @@ type CheckedCommand interface { Command // Rejected returns whether the command was rejected. Rejected() bool + // CanAckBeforeApplication returns whether the success of the command + // can be acknowledged before the command has been applied to the state + // machine. + CanAckBeforeApplication() bool + // AckSuccess acknowledges the success of the command to its client. + // Must only be called if !Rejected. + AckSuccess() error } // AppliedCommand is a command that has been applied to the replicated state @@ -182,6 +189,19 @@ func mapCheckedCmdIter( return ret, nil } +// forEachCheckedCmdIter calls a closure on each command in the provided +// iterator. The function closes the provided iterator. +func forEachCheckedCmdIter(iter CheckedCommandIterator, fn func(CheckedCommand) error) error { + defer iter.Close() + for iter.Valid() { + if err := fn(iter.CurChecked()); err != nil { + return err + } + iter.Next() + } + return nil +} + // forEachAppliedCmdIter calls a closure on each command in the provided // iterator. The function closes the provided iterator. func forEachAppliedCmdIter(iter AppliedCommandIterator, fn func(AppliedCommand) error) error { diff --git a/pkg/storage/apply/doc.go b/pkg/storage/apply/doc.go index bf8d51bc4eee..c23e83ae7efc 100644 --- a/pkg/storage/apply/doc.go +++ b/pkg/storage/apply/doc.go @@ -89,7 +89,11 @@ evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the completion of that change, client responses are determined before replication begins. The only remaining work to be done after replication of a command succeeds is to determine whether it will be rejected and replaced by an empty -command. +command. To facilitate this acknowledgement as early as possible, this package +provides the ability to acknowledge a series of commands before applying them to +the state machine. Outcomes are determined before performing any durable work by +stepping commands through an in-memory "ephemeral" copy of the state machine. +For more, see Task.AckCommittedEntriesBeforeApplication. A final challenge comes from the desire to properly prioritize the application of commands across multiple state machines in systems like CockroachDB where diff --git a/pkg/storage/apply/doc_test.go b/pkg/storage/apply/doc_test.go index 1996ea973deb..7ef44292ca48 100644 --- a/pkg/storage/apply/doc_test.go +++ b/pkg/storage/apply/doc_test.go @@ -29,22 +29,46 @@ func ExampleTask() { dec.nonLocal[6] = true dec.shouldReject[3] = true dec.shouldReject[6] = true + fmt.Print(` +Setting up a batch of seven log entries: + - index 2 and 6 are non-local + - index 3 and 6 will be rejected + - index 5 is not trivial +`) t := apply.MakeTask(sm, dec) defer t.Close() - fmt.Println("Decode:") + fmt.Println("\nDecode (note that index 2 and 6 are not local):") if err := t.Decode(ctx, ents); err != nil { panic(err) } + fmt.Println("\nAckCommittedEntriesBeforeApplication:") + if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil { + panic(err) + } + fmt.Print(` +Above, only index 1 and 4 get acked early. The command at 5 is +non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry +must be in the first batch to qualify for acking early. 2 is not local +(so there's nobody to ack), and 3 is rejected. We can't ack rejected +commands early because the state machine is free to handle them any way +it likes. +`) + fmt.Println("\nApplyCommittedEntries:") if err := t.ApplyCommittedEntries(ctx); err != nil { panic(err) } // Output: // - // Decode: + // Setting up a batch of seven log entries: + // - index 2 and 6 are non-local + // - index 3 and 6 will be rejected + // - index 5 is not trivial + // + // Decode (note that index 2 and 6 are not local): // decoding command 1; local=true // decoding command 2; local=false // decoding command 3; local=true @@ -53,16 +77,27 @@ func ExampleTask() { // decoding command 6; local=false // decoding command 7; local=true // + // AckCommittedEntriesBeforeApplication: + // acknowledging command 1 before application + // acknowledging command 4 before application + // + // Above, only index 1 and 4 get acked early. The command at 5 is + // non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry + // must be in the first batch to qualify for acking early. 2 is not local + // (so there's nobody to ack), and 3 is rejected. We can't ack rejected + // commands early because the state machine is free to handle them any way + // it likes. + // // ApplyCommittedEntries: // applying batch with commands=[1 2 3 4] // applying side-effects of command 1 // applying side-effects of command 2 // applying side-effects of command 3 // applying side-effects of command 4 - // finishing and acknowledging command 1; rejected=false + // finishing command 1; rejected=false // finishing and acknowledging command 2; rejected=false // finishing and acknowledging command 3; rejected=true - // finishing and acknowledging command 4; rejected=false + // finishing command 4; rejected=false // applying batch with commands=[5] // applying side-effects of command 5 // finishing and acknowledging command 5; rejected=false diff --git a/pkg/storage/apply/task.go b/pkg/storage/apply/task.go index 681d86779c50..8bfa62acd0a1 100644 --- a/pkg/storage/apply/task.go +++ b/pkg/storage/apply/task.go @@ -29,7 +29,15 @@ type StateMachine interface { // effects that a group of Commands will have on the replicated state // machine. Commands are staged in the batch one-by-one and then the // entire batch is committed at once. - NewBatch() Batch + // + // Batch comes in two flavors - real batches and ephemeral batches. + // Real batches are capable of accumulating updates from commands and + // applying them to the state machine. Ephemeral batches are not able + // to make changes to the durable state machine, but can still be used + // for the purpose of checking commands to determine whether they will + // be rejected or not when staged in a real batch. The principal user + // of ephemeral batches is AckCommittedEntriesBeforeApplication. + NewBatch(ephemeral bool) Batch // ApplySideEffects applies the in-memory side-effects of a Command to // the replicated state machine. The method will be called in the order // that the commands are committed to the state machine's log. Once the @@ -117,6 +125,106 @@ func (t *Task) assertDecoded() { } } +// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of +// raft entries that have been durably committed to the raft log but have not +// yet been applied to the proposer replica's replicated state machine. +// +// This is safe because a proposal through raft can be known to have succeeded +// as soon as it is durably replicated to a quorum of replicas (i.e. has +// committed in the raft log). The proposal does not need to wait for the +// effects of the proposal to be applied in order to know whether its changes +// will succeed or fail. This is because the raft log is the provider of +// atomicity and durability for replicated writes, not (ignoring log +// truncation) the replicated state machine itself. +// +// However, there are a few complications to acknowledging the success of a +// proposal at this stage: +// +// 1. Committing an entry in the raft log and having the command in that entry +// succeed are similar but not equivalent concepts. Even if the entry succeeds +// in achieving durability by replicating to a quorum of replicas, its command +// may still be rejected "beneath raft". This means that a (deterministic) +// check after replication decides that the command will not be applied to the +// replicated state machine. In that case, the client waiting on the result of +// the command should not be informed of its success. Luckily, this check is +// cheap to perform so we can do it here and when applying the command. +// +// Determining whether the command will succeed or be rejected before applying +// it for real is accomplished using an ephemeral batch. Commands are staged in +// the ephemeral batch to acquire CheckedCommands, which can then be acknowledged +// immediately even though the ephemeral batch itself cannot be used to update +// the durable state machine. Once the rejection status of each command is +// determined, any successful commands that permit acknowledgement before +// application (see CanAckBeforeApplication) are acknowledged. The ephemeral +// batch is then thrown away. +// +// 2. Some commands perform non-trivial work such as updating Replica configuration +// state or performing Range splits. In those cases, it's likely that the client +// is interested in not only knowing whether it has succeeded in sequencing the +// change in the raft log, but also in knowing when the change has gone into +// effect. There's currently no exposed hook to ask for an acknowledgement only +// after a command has been applied, so for simplicity the current implementation +// only ever acks transactional writes before they have gone into effect. All +// other commands wait until they have been applied to ack their client. +// +// 3. Even though we can determine whether a command has succeeded without applying +// it, the effect of the command will not be visible to conflicting commands until +// it is applied. Because of this, the client can be informed of the success of +// a write at this point, but we cannot release that write's latches until the +// write has applied. See ProposalData.signalProposalResult/finishApplication. +// +// 4. etcd/raft may provided a series of CommittedEntries in a Ready struct that +// haven't actually been appended to our own log. This is most common in single +// node replication groups, but it is possible when a follower in a multi-node +// replication group is catching up after falling behind. In the first case, +// the entries are not yet committed so acknowledging them would be a lie. In +// the second case, the entries are committed so we could acknowledge them at +// this point, but doing so seems risky. To avoid complications in either case, +// the method takes a maxIndex parameter that limits the indexes that it will +// acknowledge. Typically, callers will supply the highest index that they have +// durably written to their raft log for this upper bound. +// +func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error { + t.assertDecoded() + if !t.anyLocal { + return nil // fast-path + } + + // Create a new ephemeral application batch. All we're interested in is + // whether commands will be rejected or not when staged in a real batch. + batch := t.sm.NewBatch(true /* ephemeral */) + defer batch.Close() + + iter := t.dec.NewCommandIter() + defer iter.Close() + + // Collect a batch of trivial commands from the applier. Stop at the first + // non-trivial command or at the first command with an index above maxIndex. + batchIter := takeWhileCmdIter(iter, func(cmd Command) bool { + if cmd.Index() > maxIndex { + return false + } + return cmd.IsTrivial() + }) + + // Stage the commands in the (ephemeral) batch. + stagedIter, err := mapCmdIter(batchIter, batch.Stage) + if err != nil { + return err + } + + // Acknowledge any locally-proposed commands that succeeded in being staged + // in the batch and can be acknowledged before they are actually applied. + // Don't acknowledge rejected proposals early because the StateMachine may + // want to retry the command instead of returning the error to the client. + return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error { + if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() { + return cmd.AckSuccess() + } + return nil + }) +} + // SetMaxBatchSize sets the maximum application batch size. If 0, no limit // will be placed on the number of commands that can be applied in a batch. func (t *Task) SetMaxBatchSize(size int) { @@ -144,7 +252,7 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error { // b) exactly one non-trivial command func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error { // Create a new application batch. - batch := t.sm.NewBatch() + batch := t.sm.NewBatch(false /* ephemeral */) defer batch.Close() // Consume a batch-worth of commands. diff --git a/pkg/storage/apply/task_test.go b/pkg/storage/apply/task_test.go index 322b058e7233..d68970f2963f 100644 --- a/pkg/storage/apply/task_test.go +++ b/pkg/storage/apply/task_test.go @@ -16,6 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/raftpb" ) @@ -50,15 +51,29 @@ type appliedCmd struct { *checkedCmd } -func (c *cmd) Index() uint64 { return c.index } -func (c *cmd) IsTrivial() bool { return !c.nonTrivial } -func (c *cmd) IsLocal() bool { return !c.nonLocal } -func (c *checkedCmd) Rejected() bool { return c.rejected } -func (c *appliedCmd) FinishAndAckOutcome() error { - c.finished = true +func (c *cmd) Index() uint64 { return c.index } +func (c *cmd) IsTrivial() bool { return !c.nonTrivial } +func (c *cmd) IsLocal() bool { return !c.nonLocal } +func (c *checkedCmd) Rejected() bool { return c.rejected } +func (c *checkedCmd) CanAckBeforeApplication() bool { return true } +func (c *checkedCmd) AckSuccess() error { c.acked = true if logging { - fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + fmt.Printf(" acknowledging command %d before application\n", c.Index()) + } + return nil +} +func (c *appliedCmd) FinishAndAckOutcome() error { + c.finished = true + if c.acked { + if logging { + fmt.Printf(" finishing command %d; rejected=%t\n", c.Index(), c.Rejected()) + } + } else { + if logging { + fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected()) + } + c.acked = true } return nil } @@ -106,12 +121,12 @@ func getTestStateMachine() *testStateMachine { return new(testStateMachine) } -func (sm *testStateMachine) NewBatch() apply.Batch { +func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch { if sm.batchOpen { panic("batch not closed") } sm.batchOpen = true - return &testBatch{sm: sm} + return &testBatch{sm: sm, ephemeral: ephemeral} } func (sm *testStateMachine) ApplySideEffects( cmdI apply.CheckedCommand, @@ -126,8 +141,9 @@ func (sm *testStateMachine) ApplySideEffects( } type testBatch struct { - sm *testStateMachine - staged []uint64 + sm *testStateMachine + ephemeral bool + staged []uint64 } func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { @@ -137,6 +153,9 @@ func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { return &ccmd, nil } func (b *testBatch) ApplyToStateMachine(_ context.Context) error { + if b.ephemeral { + return errors.New("can't commit an ephemeral batch") + } b.sm.batches = append(b.sm.batches, b.staged) b.sm.applied = append(b.sm.applied, b.staged...) if logging { @@ -255,3 +274,76 @@ func TestApplyCommittedEntriesWithBatchSize(t *testing.T) { require.True(t, cmd.finished) } } + +func TestAckCommittedEntriesBeforeApplication(t *testing.T) { + ctx := context.Background() + ents := makeEntries(9) + + sm := getTestStateMachine() + dec := newTestDecoder() + dec.nonTrivial[6] = true + dec.nonTrivial[7] = true + dec.nonTrivial[9] = true + dec.nonLocal[2] = true + dec.shouldReject[3] = true + + // Use an apply.Task to ack all commands before applying them. + appT := apply.MakeTask(sm, dec) + defer appT.Close() + require.NoError(t, appT.Decode(ctx, ents)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */)) + + // Assert that the state machine was not updated. + require.Equal(t, testStateMachine{}, *sm) + + // Assert that some commands were acknowledged early and that none were finished. + for _, cmd := range dec.cmds { + var exp bool + switch cmd.index { + case 1, 4, 5: + exp = true // local and successful + case 2: + exp = false // remote + case 3: + exp = false // local and rejected + default: + exp = false // after first non-trivial cmd + } + require.Equal(t, exp, cmd.acked) + require.False(t, cmd.finished) + } + + // Try again with a lower maximum log index. + appT.Close() + ents = makeEntries(5) + + dec = newTestDecoder() + dec.nonLocal[2] = true + dec.shouldReject[3] = true + + appT = apply.MakeTask(sm, dec) + require.NoError(t, appT.Decode(ctx, ents)) + require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */)) + + // Assert that the state machine was not updated. + require.Equal(t, testStateMachine{}, *sm) + + // Assert that some commands were acknowledged early and that none were finished. + for _, cmd := range dec.cmds { + var exp bool + switch cmd.index { + case 1, 4: + exp = true // local and successful + case 2: + exp = false // remote + case 3: + exp = false // local and rejected + case 5: + exp = false // index too high + default: + t.Fatalf("unexpected index %d", cmd.index) + } + require.Equal(t, exp, cmd.acked) + require.False(t, cmd.finished) + } +} diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 15f487ca7fbb..be1867910313 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -4554,3 +4554,107 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing require.Nil(t, pErr, "expected to succeed after healing the partition") mtc.waitForValues(keyA, []int64{3, 3, 3}) } + +// TestAckWriteBeforeApplication tests that the success of transactional writes +// is acknowledged after those writes have been committed to a Range's Raft log +// but before those writes have been applied to its replicated state machine. +func TestAckWriteBeforeApplication(t *testing.T) { + defer leaktest.AfterTest(t)() + for _, tc := range []struct { + repls int + expAckBeforeAppl bool + }{ + // In a single-replica Range, each handleRaftReady iteration will append + // new entries to the Raft log and immediately apply them. This prevents + // "early acknowledgement" from being possible or useful. See the comment + // on apply.Task.AckCommittedEntriesBeforeApplication. + {1, false}, + // In a three-replica Range, each handleRaftReady iteration will append + // a set of entries to the Raft log and then apply the previous set of + // entries. This makes "early acknowledgement" a major optimization, as + // it pulls the entire latency required to append the next set of entries + // to the Raft log out of the client-perceived latency of the previous + // set of entries. + {3, true}, + } { + t.Run(fmt.Sprintf("numRepls=%d", tc.repls), func(t *testing.T) { + var filterActive int32 + var magicTS hlc.Timestamp + blockPreApplication, blockPostApplication := make(chan struct{}), make(chan struct{}) + applyFilterFn := func(ch chan struct{}) storagebase.ReplicaApplyFilter { + return func(filterArgs storagebase.ApplyFilterArgs) (int, *roachpb.Error) { + if atomic.LoadInt32(&filterActive) == 1 && filterArgs.Timestamp == magicTS { + <-ch + } + return 0, nil + } + } + + tsc := storage.TestStoreConfig(nil) + tsc.TestingKnobs.TestingApplyFilter = applyFilterFn(blockPreApplication) + tsc.TestingKnobs.TestingPostApplyFilter = applyFilterFn(blockPostApplication) + + mtc := &multiTestContext{storeConfig: &tsc} + defer mtc.Stop() + mtc.Start(t, tc.repls) + + // Replicate the Range, if necessary. + key := roachpb.Key("a") + rangeID := mtc.stores[0].LookupReplica(roachpb.RKey(key)).RangeID + for i := 1; i < tc.repls; i++ { + mtc.replicateRange(rangeID, i) + } + + // Begin peforming a write on the Range. + magicTS = mtc.stores[0].Clock().Now() + atomic.StoreInt32(&filterActive, 1) + ch := make(chan *roachpb.Error, 1) + go func() { + ctx := context.Background() + put := putArgs(key, []byte("val")) + _, pErr := client.SendWrappedWith(ctx, mtc.stores[0].TestSender(), roachpb.Header{ + Timestamp: magicTS, + }, put) + ch <- pErr + }() + + expResult := func() { + t.Helper() + if pErr := <-ch; pErr != nil { + t.Fatalf("unexpected proposal result error: %v", pErr) + } + } + dontExpResult := func() { + t.Helper() + select { + case <-time.After(10 * time.Millisecond): + // Expected. + case pErr := <-ch: + t.Fatalf("unexpected proposal acknowledged before TestingApplyFilter: %v", pErr) + } + } + + // The result should be blocked on the pre-apply filter. + dontExpResult() + + // Release the pre-apply filter. + close(blockPreApplication) + // Depending on the cluster configuration, The result may not be blocked + // on the post-apply filter because it may be able to acknowledges the + // client before applying. + if tc.expAckBeforeAppl { + expResult() + } else { + dontExpResult() + } + + // Stop blocking Raft application to allow everything to shut down cleanly. + // This also confirms that the proposal does eventually apply. + close(blockPostApplication) + // If we didn't expect an acknowledgement before, we do now. + if !tc.expAckBeforeAppl { + expResult() + } + }) + } +} diff --git a/pkg/storage/replica_application_cmd.go b/pkg/storage/replica_application_cmd.go index 5524bcc0c47a..7712eb826917 100644 --- a/pkg/storage/replica_application_cmd.go +++ b/pkg/storage/replica_application_cmd.go @@ -116,6 +116,39 @@ func (c *replicatedCmd) Rejected() bool { return c.forcedErr != nil } +// CanAckBeforeApplication implements the apply.CheckedCommand interface. +func (c *replicatedCmd) CanAckBeforeApplication() bool { + // CanAckBeforeApplication determines whether the request type is compatible + // with acknowledgement of success before it has been applied. For now, this + // determination is based on whether the request is performing transactional + // writes. This could be exposed as an option on the batch header instead. + // + // We don't try to ack async consensus writes before application because we + // know that there isn't a client waiting for the result. + req := c.proposal.Request + return req.IsTransactionWrite() && !req.AsyncConsensus +} + +// AckSuccess implements the apply.CheckedCommand interface. +func (c *replicatedCmd) AckSuccess() error { + if !c.IsLocal() { + return nil + } + + // Signal the proposal's response channel with the result. + // Make a copy of the response to avoid data races between client mutations + // of the response and use of the response in endCmds.done when the command + // is finished. + var resp proposalResult + reply := *c.proposal.Local.Reply + reply.Responses = append([]roachpb.ResponseUnion(nil), reply.Responses...) + resp.Reply = &reply + resp.Intents = c.proposal.Local.DetachIntents() + resp.EndTxns = c.proposal.Local.DetachEndTxns(false /* alwaysOnly */) + c.proposal.signalProposalResult(resp) + return nil +} + // FinishAndAckOutcome implements the apply.AppliedCommand interface. func (c *replicatedCmd) FinishAndAckOutcome() error { tracing.FinishSpan(c.sp) diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index c9664188f3bf..02f3ffb889d2 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -105,8 +105,10 @@ func (e *nonDeterministicFailure) Unwrap() error { return e.wrapped } // side-effects of each command is applied to the Replica's in-memory state. type replicaStateMachine struct { r *Replica - // batch is returned from NewBatch(). + // batch is returned from NewBatch(false /* ephemeral */). batch replicaAppBatch + // ephemeralBatch is returned from NewBatch(true /* ephemeral */). + ephemeralBatch ephemeralReplicaAppBatch // stats are updated during command application and reset by moveStats. stats applyCommittedEntriesStats } @@ -321,8 +323,16 @@ func checkForcedErr( } // NewBatch implements the apply.StateMachine interface. -func (sm *replicaStateMachine) NewBatch() apply.Batch { +func (sm *replicaStateMachine) NewBatch(ephemeral bool) apply.Batch { r := sm.r + if ephemeral { + mb := &sm.ephemeralBatch + mb.r = r + r.mu.RLock() + mb.state = r.mu.state + r.mu.RUnlock() + return mb + } b := &sm.batch b.r = r b.sm = sm @@ -775,6 +785,35 @@ func (b *replicaAppBatch) Close() { *b = replicaAppBatch{} } +// ephemeralReplicaAppBatch implements the apply.Batch interface. +// +// The batch performs the bare-minimum amount of work to be able to +// determine whether a replicated command should be rejected or applied. +type ephemeralReplicaAppBatch struct { + r *Replica + state storagepb.ReplicaState +} + +// Stage implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { + cmd := cmdI.(*replicatedCmd) + ctx := cmd.ctx + + mb.r.shouldApplyCommand(ctx, cmd, &mb.state) + mb.state.LeaseAppliedIndex = cmd.leaseIndex + return cmd, nil +} + +// ApplyToStateMachine implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) ApplyToStateMachine(ctx context.Context) error { + panic("cannot apply ephemeralReplicaAppBatch to state machine") +} + +// Close implements the apply.Batch interface. +func (mb *ephemeralReplicaAppBatch) Close() { + *mb = ephemeralReplicaAppBatch{} +} + // ApplySideEffects implements the apply.StateMachine interface. The method // handles the third phase of applying a command to the replica state machine. // diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 62b3852e586f..35ab1b7b4364 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -546,6 +546,40 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } } + // If the ready struct includes entries that have been committed, these + // entries will be applied to the Replica's replicated state machine down + // below, after appending new entries to the raft log and sending messages + // to peers. However, the process of appending new entries to the raft log + // and then applying committed entries to the state machine can take some + // time - and these entries are already durably committed. If they have + // clients waiting on them, we'd like to acknowledge their success as soon + // as possible. To facilitate this, we take a quick pass over the committed + // entries and acknowledge as many as we can trivially prove will not be + // rejected beneath raft. + // + // Note that we only acknowledge up to the current last index in the Raft + // log. The CommittedEntries slice may contain entries that are also in the + // Entries slice (to be appended in this ready pass), and we don't want to + // acknowledge them until they are durably in our local Raft log. This is + // most common in single node replication groups, but it is possible when a + // follower in a multi-node replication group is catching up after falling + // behind. In the first case, the entries are not yet committed so + // acknowledging them would be a lie. In the second case, the entries are + // committed so we could acknowledge them at this point, but doing so seems + // risky. To avoid complications in either case, we pass lastIndex for the + // maxIndex argument to AckCommittedEntriesBeforeApplication. + sm := r.getStateMachine() + dec := r.getDecoder() + appTask := apply.MakeTask(sm, dec) + appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) + defer appTask.Close() + if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { + return stats, err.(*nonDeterministicFailure).safeExpl, err + } + if err := appTask.AckCommittedEntriesBeforeApplication(ctx, lastIndex); err != nil { + return stats, err.(*nonDeterministicFailure).safeExpl, err + } + // Separate the MsgApp messages from all other Raft message types so that we // can take advantage of the optimization discussed in the Raft thesis under // the section: `10.2.1 Writing to the leader’s disk in parallel`. The @@ -721,14 +755,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - sm := r.getStateMachine() - dec := r.getDecoder() - appTask := apply.MakeTask(sm, dec) - appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) - defer appTask.Close() - if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { - return stats, err.(*nonDeterministicFailure).safeExpl, err - } if err := appTask.ApplyCommittedEntries(ctx); err != nil { return stats, err.(*nonDeterministicFailure).safeExpl, err } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 6063ffa66aaf..8278ee487188 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2487,12 +2487,10 @@ func TestStoreScanMultipleIntents(t *testing.T) { // in a single batch. manual.Increment(txnwait.TxnLivenessThreshold.Nanoseconds() + 1) - // Query the range with a single INCONSISTENT scan, which should - // cause all intents to be resolved. + // Query the range with a single scan, which should cause all intents + // to be resolved. sArgs := scanArgs(key1, key10.Next()) - if _, pErr := client.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{ - ReadConsistency: roachpb.INCONSISTENT, - }, &sArgs); pErr != nil { + if _, pErr := client.SendWrapped(context.Background(), store.TestSender(), &sArgs); pErr != nil { t.Fatal(pErr) }