Skip to content

Commit

Permalink
[DNM] storage/apply: support applying committed raft entries before a…
Browse files Browse the repository at this point in the history
…pplication

This will be pulled out into cockroachdb#38954 instead.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 2, 2019
1 parent fc95a5a commit d2927f7
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 13 deletions.
7 changes: 7 additions & 0 deletions pkg/storage/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type CheckedCommand interface {
Command
// Rejected returns whether the command was rejected.
Rejected() bool
// CanAckBeforeApplication returns whether the success of the command
// can be acknowledged before the command has been applied to the state
// machine.
CanAckBeforeApplication() bool
// AckSuccess acknowledges the success of the command to its client.
// Should only be called if !Rejected.
AckSuccess() error
}

// AppliedCommand is a command that has been applied to the replicated state
Expand Down
101 changes: 99 additions & 2 deletions pkg/storage/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ type StateMachine interface {
// effects that a group of Commands will have on the replicated state
// machine. Commands are staged in the batch one-by-one and then the
// entire batch is committed at once.
NewBatch() Batch
//
// Batch come in two flavors - real batches and mock batches. Real
// batches are capable of accumulating updates from commands and
// applying them to the state machine. Mock batches are not able to
// make changes to the state machine, but can still be used for the
// purpose of checking commands to determine whether they will be
// rejected or not when staged in a real batch.
NewBatch(mock bool) Batch
// ApplySideEffects applies the in-memory side-effects of a Command to
// the replicated state machine. The method will be called in the order
// that the commands are committed to the state machine's log. It will
Expand Down Expand Up @@ -124,6 +131,96 @@ func (t *Task) assertDecoded() {
}
}

// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of
// raft entries that have been durably committed to the raft log but have not
// yet been applied to the proposer replica's replicated state machine.
//
// This is safe because a proposal through raft is known to have succeeded as
// soon as it is replicated to a quorum of replicas (i.e. has committed in the
// raft log). The proposal does not need to wait for the effects of the proposal
// to be applied in order to know whether its changes will succeed or fail. This
// is because the raft log is the provider of atomicity and durability for
// replicated writes, not (ignoring log truncation) the replicated state machine
// itself.
//
// However, there are a few complications to acknowledging the success of a
// proposal at this stage:
//
// 1. Committing an entry in the raft log and having the command in that entry
// succeed are similar but not equivalent concepts. Even if the entry succeeds
// in achieving durability by replicating to a quorum of replicas, its command
// may still be rejected "beneath raft". This means that a (deterministic)
// check after replication decides that the command will not be applied to the
// replicated state machine. In that case, the client waiting on the result of
// the command should not be informed of its success. Luckily, this check is
// cheap to perform so we can do it here and when applying the command. See
// Replica.checkShouldApplyCommand.
//
// 2. Some commands perform non-trivial work such as updating Replica configuration
// state or performing Range splits. In those cases, it's likely that the client
// is interested in not only knowing whether it has succeeded in sequencing the
// change in the raft log, but also in knowing when the change has gone into
// effect. There's currently no exposed hook to ask for an acknowledgement only
// after a command has been applied, so for simplicity the current implementation
// only ever acks transactional writes before they have gone into effect. All
// other commands wait until they have been applied to ack their client.
//
// 3. Even though we can determine whether a command has succeeded without applying
// it, the effect of the command will not be visible to conflicting commands until
// it is applied. Because of this, the client can be informed of the success of
// a write at this point, but we cannot release that write's latches until the
// write has applied. See ProposalData.signalProposalResult/finishApplication.
//
// 4. etcd/raft may provided a series of CommittedEntries in a Ready struct that
// haven't actually been appended to our own log. This is most common in single
// node replication groups, but it is possible when a follower in a multi-node
// replication group is catching up after falling behind. In the first case,
// the entries are not yet committed so acknowledging them would be a lie. In
// the second case, the entries are committed so we could acknowledge them at
// this point, but doing so seems risky. To avoid complications in either case,
// the method takes a maxIndex parameter that limits the indexes that it will
// acknowledge. Typically, callers will supply the highest index that they have
// durably written to their raft log for this upper bound.
//
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error {
t.assertDecoded()
if !t.anyLocal {
return nil // fast-path
}

// Create a new mock application batch. All we're interested in is whether
// commands will be rejected or not when staged in a real batch.
batch := t.sm.NewBatch(true /* mock */)
defer batch.Close()

iter := t.dec.NewCommandIter()
defer iter.Close()

// Collect a batch of trivial commands from the applier. Stop at the first
// non-trivial command or at the first command with an index above maxIndex.
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
if cmd.Index() > maxIndex {
return false
}
return cmd.IsTrivial()
})

// Stage the commands in the (mock) batch.
stagedIter, err := mapCmdIter(batchIter, batch.Stage)
if err != nil {
return err
}

// Acknowledge any locally-proposed commands that succeeded in being staged
// in the batch and can be acknowledged before they are actually applied.
return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess()
}
return nil
})
}

// SetMaxBatchSize sets the maximum application batch size. If 0, no limit
// will be placed on the number of commands that can be applied in a batch.
func (t *Task) SetMaxBatchSize(size int) {
Expand Down Expand Up @@ -151,7 +248,7 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {
// b) exactly one non-trivial command
func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error {
// Create a new application batch.
batch := t.sm.NewBatch()
batch := t.sm.NewBatch(false /* mock */)
defer batch.Close()

// Consume a batch-worth of commands.
Expand Down
77 changes: 77 additions & 0 deletions pkg/storage/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,80 @@ func TestApplyCommittedEntriesWithBatchSize(t *testing.T) {
require.True(t, cmd.finished)
}
}

func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
ctx := context.Background()
sm := testingStateMachine{}
dec := testingDecoder{cmds: []cmd{
{index: 1, trivial: true, local: true, shouldReject: false},
{index: 2, trivial: true, local: false, shouldReject: false},
{index: 3, trivial: true, local: true, shouldReject: true},
{index: 4, trivial: true, local: true, shouldReject: false},
{index: 5, trivial: true, local: true, shouldReject: false},
{index: 6, trivial: false, local: true, shouldReject: false},
{index: 7, trivial: false, local: true, shouldReject: false},
{index: 8, trivial: true, local: true, shouldReject: false},
{index: 9, trivial: false, local: true, shouldReject: false},
}}

// Use an apply.Task to ack all commands before applying them.
appT := apply.MakeTask(&sm, &dec)
defer appT.Close()
require.NoError(t, appT.Decode(ctx, nil /* ents */))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */))

// Assert that the state machine was not updated.
require.Equal(t, testingStateMachine{}, sm)

// Assert that some commands were acknowledged early and that none were finished.
for _, cmd := range dec.cmds {
var exp bool
switch cmd.index {
case 1, 4, 5:
exp = true // local and successful
case 2:
exp = false // remote
case 3:
exp = false // local and rejected
default:
exp = false // after first non-trivial cmd
}
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}

// Try again with a lower maximum log index.
appT.Close()
appT = apply.MakeTask(&sm, &dec)
dec = testingDecoder{cmds: []cmd{
{index: 1, trivial: true, local: true, shouldReject: false},
{index: 2, trivial: true, local: false, shouldReject: false},
{index: 3, trivial: true, local: true, shouldReject: true},
{index: 4, trivial: true, local: true, shouldReject: false},
{index: 5, trivial: true, local: true, shouldReject: false},
}}
require.NoError(t, appT.Decode(ctx, nil /* ents */))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */))

// Assert that the state machine was not updated.
require.Equal(t, testingStateMachine{}, sm)

// Assert that some commands were acknowledged early and that none were finished.
for _, cmd := range dec.cmds {
var exp bool
switch cmd.index {
case 1, 4:
exp = true // local and successful
case 2:
exp = false // remote
case 3:
exp = false // local and rejected
case 5:
exp = false // index too high
default:
t.Fatalf("unexpected index %d", cmd.index)
}
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}
}
38 changes: 38 additions & 0 deletions pkg/storage/cmd_app_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -156,6 +157,43 @@ func (c *cmdAppCtx) Rejected() bool {
return c.forcedErr != nil
}

// CanAckBeforeApplication implements the apply.CheckedCommand interface.
func (c *cmdAppCtx) CanAckBeforeApplication() bool {
// CanAckBeforeApplication determines whether the request type is compatable
// with acknowledgement of success before it has been applied. For now, this
// determination is based on whether the request is performing transactional
// writes. This could be exposed as an option on the batch header instead.
//
// We don't try to ack async consensus writes before application because we
// know that there isn't a client waiting for the result.
req := c.proposal.Request
return req.IsTransactionWrite() && !req.AsyncConsensus
}

// AckSuccess implements the apply.CheckedCommand interface.
func (c *cmdAppCtx) AckSuccess() error {
if !c.IsLocal() {
return nil
}
// If the local command isn't already managing its own tracing span, fork
// the request's context so that it can outlive the proposer's context.
if c.proposal.sp == nil {
c.proposal.ctx, c.proposal.sp = tracing.ForkCtxSpan(c.ctx, "command application")
c.ctx = c.proposal.ctx
}

// Signal the proposal's response channel with the result.
// Make a copy of the response to avoid data races.
var resp proposalResult
reply := *c.proposal.Local.Reply
reply.Responses = append([]roachpb.ResponseUnion(nil), reply.Responses...)
resp.Reply = &reply
resp.Intents = c.proposal.Local.DetachIntents()
resp.EndTxns = c.proposal.Local.DetachEndTxns(false)
c.proposal.signalProposalResult(resp)
return nil
}

// AckOutcomeAndFinish implements the apply.AppliedCommand interface.
func (c *cmdAppCtx) AckOutcomeAndFinish() error {
if !c.IsLocal() {
Expand Down
43 changes: 41 additions & 2 deletions pkg/storage/replica_application_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ type applyCommittedEntriesStats struct {
// side-effects of each command is applied to the Replica's in-memory state.
type replicaApplier struct {
r *Replica
// batch is returned from NewBatch().
// batch is returned from NewBatch(false /* mock */).
batch replicaAppBatch
// mockBatch is returned from NewBatch(true /* mock */).
mockBatch mockReplicaAppBatch
// stats are updated during command application and reset by moveStats.
stats applyCommittedEntriesStats
}
Expand Down Expand Up @@ -386,8 +388,16 @@ func checkForcedErr(
}

// NewBatch implements the apply.StateMachine interface.
func (a *replicaApplier) NewBatch() apply.Batch {
func (a *replicaApplier) NewBatch(mock bool) apply.Batch {
r := a.r
if mock {
mb := &a.mockBatch
mb.r = r
r.mu.RLock()
mb.state = r.mu.state
r.mu.RUnlock()
return mb
}
b := &a.batch
b.r = r
b.a = a
Expand Down Expand Up @@ -835,6 +845,35 @@ func (b *replicaAppBatch) Close() {
*b = replicaAppBatch{}
}

// mockReplicaAppBatch implements the apply.Batch interface.
//
// The batch performs the bare-minimum amount of work to be able to
// determine whether a replicated command should be rejected or applied.
type mockReplicaAppBatch struct {
r *Replica
state storagepb.ReplicaState
}

// Stage implements the apply.Batch interface.
func (mb *mockReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
cmd := cmdI.(*cmdAppCtx)
ctx := cmd.ctx

mb.r.checkShouldApplyCommand(ctx, cmd, &mb.state)
mb.state.LeaseAppliedIndex = cmd.leaseIndex
return cmd, nil
}

// Commit implements the apply.Batch interface.
func (mb *mockReplicaAppBatch) Commit(ctx context.Context) error {
panic("cannot commit mockReplicaAppBatch")
}

// Close implements the apply.Batch interface.
func (mb *mockReplicaAppBatch) Close() {
*mb = mockReplicaAppBatch{}
}

// ApplySideEffects implements the apply.StateMachine interface. The method
// handles the third phase of applying a command to the replica state machine.
//
Expand Down
41 changes: 32 additions & 9 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,38 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
}

// If the ready struct includes entries that have been committed, these
// entries will be applied to the Replica's replicated state machine down
// below, after appending new entries to the raft log and sending messages
// to peers. However, the process of appending new entries to the raft log
// and then applying committed entries to the state machine can take some
// time - and these entries are already durably committed. If they have
// clients waiting on them, we'd like to acknowledge their success as soon
// as possible. To facilitate this, we take a quick pass over the committed
// entries and acknowledge as many as we can trivially prove will not be
// rejected beneath raft.
//
// We only try to ack committed entries before applying them if we have
// clients waiting on the result of Raft entries. If not, we know none of
// the committed entries were proposed locally.
//
// We share the entry generator and command application batch with the call
// to handleCommittedEntriesRaftMuLocked down below to avoid duplicating any
// decoding work.
app := r.getApplier()
dec := r.getDecoder()
appTask := apply.MakeTask(app, dec)
appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize)
defer appTask.Close()
if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil {
// TODO(WIP): Propagate errExpl.
return stats, "", err
}
if err := appTask.AckCommittedEntriesBeforeApplication(ctx, lastIndex); err != nil {
// TODO(WIP): Propagate errExpl.
return stats, "", err
}

// Separate the MsgApp messages from all other Raft message types so that we
// can take advantage of the optimization discussed in the Raft thesis under
// the section: `10.2.1 Writing to the leader’s disk in parallel`. The
Expand Down Expand Up @@ -718,15 +750,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked(

applicationStart := timeutil.Now()
if len(rd.CommittedEntries) > 0 {
app := r.getApplier()
dec := r.getDecoder()
appTask := apply.MakeTask(app, dec)
appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize)
defer appTask.Close()
if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil {
// TODO(WIP): Propagate errExpl.
return stats, "", err
}
if err := appTask.ApplyCommittedEntries(ctx); err != nil {
// TODO(WIP): Propagate errExpl.
return stats, "", err
Expand Down

0 comments on commit d2927f7

Please sign in to comment.