Skip to content

Commit

Permalink
[DNM] storage/apply: support applying committed raft entries before a…
Browse files Browse the repository at this point in the history
…pplication

This will be pulled out into cockroachdb#38954 instead.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 6, 2019
1 parent 54a2da7 commit 5937762
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 24 deletions.
7 changes: 7 additions & 0 deletions pkg/storage/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ type CheckedCommand interface {
Command
// Rejected returns whether the command was rejected.
Rejected() bool
// CanAckBeforeApplication returns whether the success of the command
// can be acknowledged before the command has been applied to the state
// machine.
CanAckBeforeApplication() bool
// AckSuccess acknowledges the success of the command to its client.
// Must only be called if !Rejected.
AckSuccess() error
}

// AppliedCommand is a command that has been applied to the replicated state
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/apply/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ evaluated KV" (see docs/RFCS/20160420_proposer_evaluated_kv.md). With the
completion of that change, client responses are determined before replication
begins. The only remaining work to be done after replication of a command
succeeds is to determine whether it will be rejected and replaced by an empty
command.
command. To facilitate this acknowledgement as early as possible, this package
provides the ability to acknowledge a series of commands before applying them to
the state machine. Outcomes are determined before performing any durable work by
stepping commands through an in-memory "ephemeral" copy of the state machine.
For more, see Task.AckCommittedEntriesBeforeApplication.
A final challenge comes from the desire to properly prioritize the application
of commands across multiple state machines in systems like CockroachDB where
Expand Down
43 changes: 39 additions & 4 deletions pkg/storage/apply/doc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,46 @@ func ExampleTask() {
dec.nonLocal[6] = true
dec.shouldReject[3] = true
dec.shouldReject[6] = true
fmt.Print(`
Setting up a batch of seven log entries:
- index 2 and 6 are non-local
- index 3 and 6 will be rejected
- index 5 is not trivial
`)

t := apply.MakeTask(sm, dec)
defer t.Close()

fmt.Println("Decode:")
fmt.Println("\nDecode (note that index 2 and 6 are not local):")
if err := t.Decode(ctx, ents); err != nil {
panic(err)
}

fmt.Println("\nAckCommittedEntriesBeforeApplication:")
if err := t.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */); err != nil {
panic(err)
}
fmt.Print(`
Above, only index 1 and 4 get acked early. The command at 5 is
non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry
must be in the first batch to qualify for acking early. 2 is not local
(so there's nobody to ack), and 3 is rejected. We can't ack rejected
commands early because the state machine is free to handle them any way
it likes.
`)

fmt.Println("\nApplyCommittedEntries:")
if err := t.ApplyCommittedEntries(ctx); err != nil {
panic(err)
}
// Output:
//
// Decode:
// Setting up a batch of seven log entries:
// - index 2 and 6 are non-local
// - index 3 and 6 will be rejected
// - index 5 is not trivial
//
// Decode (note that index 2 and 6 are not local):
// decoding command 1; local=true
// decoding command 2; local=false
// decoding command 3; local=true
Expand All @@ -53,16 +77,27 @@ func ExampleTask() {
// decoding command 6; local=false
// decoding command 7; local=true
//
// AckCommittedEntriesBeforeApplication:
// acknowledging command 1 before application
// acknowledging command 4 before application
//
// Above, only index 1 and 4 get acked early. The command at 5 is
// non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry
// must be in the first batch to qualify for acking early. 2 is not local
// (so there's nobody to ack), and 3 is rejected. We can't ack rejected
// commands early because the state machine is free to handle them any way
// it likes.
//
// ApplyCommittedEntries:
// committing batch with commands=[1 2 3 4]
// applying side-effects of command 1
// applying side-effects of command 2
// applying side-effects of command 3
// applying side-effects of command 4
// finishing and acknowledging command 1; rejected=false
// finishing command 1; rejected=false
// finishing and acknowledging command 2; rejected=false
// finishing and acknowledging command 3; rejected=true
// finishing and acknowledging command 4; rejected=false
// finishing command 4; rejected=false
// committing batch with commands=[5]
// applying side-effects of command 5
// finishing and acknowledging command 5; rejected=false
Expand Down
112 changes: 110 additions & 2 deletions pkg/storage/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,15 @@ type StateMachine interface {
// effects that a group of Commands will have on the replicated state
// machine. Commands are staged in the batch one-by-one and then the
// entire batch is committed at once.
NewBatch() Batch
//
// Batch comes in two flavors - real batches and ephemeral batches.
// Real batches are capable of accumulating updates from commands and
// applying them to the state machine. Ephemeral batches are not able
// to make changes to the durable state machine, but can still be used
// for the purpose of checking commands to determine whether they will
// be rejected or not when staged in a real batch. The principal user
// of ephemeral batches is AckCommittedEntriesBeforeApplication.
NewBatch(ephemeral bool) Batch
// ApplySideEffects applies the in-memory side-effects of a Command to
// the replicated state machine. The method will be called in the order
// that the commands are committed to the state machine's log. It will
Expand Down Expand Up @@ -107,6 +115,106 @@ func (t *Task) assertDecoded() {
}
}

// AckCommittedEntriesBeforeApplication attempts to acknowledge the success of
// raft entries that have been durably committed to the raft log but have not
// yet been applied to the proposer replica's replicated state machine.
//
// This is safe because a proposal through raft can be known to have succeeded
// as soon as it is durably replicated to a quorum of replicas (i.e. has
// committed in the raft log). The proposal does not need to wait for the
// effects of the proposal to be applied in order to know whether its changes
// will succeed or fail. This is because the raft log is the provider of
// atomicity and durability for replicated writes, not (ignoring log
// truncation) the replicated state machine itself.
//
// However, there are a few complications to acknowledging the success of a
// proposal at this stage:
//
// 1. Committing an entry in the raft log and having the command in that entry
// succeed are similar but not equivalent concepts. Even if the entry succeeds
// in achieving durability by replicating to a quorum of replicas, its command
// may still be rejected "beneath raft". This means that a (deterministic)
// check after replication decides that the command will not be applied to the
// replicated state machine. In that case, the client waiting on the result of
// the command should not be informed of its success. Luckily, this check is
// cheap to perform so we can do it here and when applying the command.
//
// Determining whether the command will succeed or be rejected before applying
// it for real is accomplished using an ephemeral batch. Commands are staged in
// the ephemeral batch to acquire CheckedCommands, which can then be acknowledged
// immediately even though the ephemeral batch itself cannot be used to update
// the durable state machine. Once the rejection status of each command is
// determined, any successful commands that permit acknowledgement before
// application (see CanAckBeforeApplication) are acknowledged. The ephemeral
// batch is then thrown away.
//
// 2. Some commands perform non-trivial work such as updating Replica configuration
// state or performing Range splits. In those cases, it's likely that the client
// is interested in not only knowing whether it has succeeded in sequencing the
// change in the raft log, but also in knowing when the change has gone into
// effect. There's currently no exposed hook to ask for an acknowledgement only
// after a command has been applied, so for simplicity the current implementation
// only ever acks transactional writes before they have gone into effect. All
// other commands wait until they have been applied to ack their client.
//
// 3. Even though we can determine whether a command has succeeded without applying
// it, the effect of the command will not be visible to conflicting commands until
// it is applied. Because of this, the client can be informed of the success of
// a write at this point, but we cannot release that write's latches until the
// write has applied. See ProposalData.signalProposalResult/finishApplication.
//
// 4. etcd/raft may provided a series of CommittedEntries in a Ready struct that
// haven't actually been appended to our own log. This is most common in single
// node replication groups, but it is possible when a follower in a multi-node
// replication group is catching up after falling behind. In the first case,
// the entries are not yet committed so acknowledging them would be a lie. In
// the second case, the entries are committed so we could acknowledge them at
// this point, but doing so seems risky. To avoid complications in either case,
// the method takes a maxIndex parameter that limits the indexes that it will
// acknowledge. Typically, callers will supply the highest index that they have
// durably written to their raft log for this upper bound.
//
func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxIndex uint64) error {
t.assertDecoded()
if !t.anyLocal {
return nil // fast-path
}

// Create a new ephemeral application batch. All we're interested in is
// whether commands will be rejected or not when staged in a real batch.
batch := t.sm.NewBatch(true /* ephemeral */)
defer batch.Close()

iter := t.dec.NewCommandIter()
defer iter.Close()

// Collect a batch of trivial commands from the applier. Stop at the first
// non-trivial command or at the first command with an index above maxIndex.
batchIter := takeWhileCmdIter(iter, func(cmd Command) bool {
if cmd.Index() > maxIndex {
return false
}
return cmd.IsTrivial()
})

// Stage the commands in the (moephemeralck) batch.
stagedIter, err := mapCmdIter(batchIter, batch.Stage)
if err != nil {
return err
}

// Acknowledge any locally-proposed commands that succeeded in being staged
// in the batch and can be acknowledged before they are actually applied.
// Don't acknowledge rejected proposals early because the StateMachine may
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(stagedIter, func(cmd CheckedCommand) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess()
}
return nil
})
}

// SetMaxBatchSize sets the maximum application batch size. If 0, no limit
// will be placed on the number of commands that can be applied in a batch.
func (t *Task) SetMaxBatchSize(size int) {
Expand Down Expand Up @@ -134,7 +242,7 @@ func (t *Task) ApplyCommittedEntries(ctx context.Context) error {
// b) exactly one non-trivial command
func (t *Task) applyOneBatch(ctx context.Context, iter CommandIterator) error {
// Create a new application batch.
batch := t.sm.NewBatch()
batch := t.sm.NewBatch(false /* ephemeral */)
defer batch.Close()

// Consume a batch-worth of commands.
Expand Down
98 changes: 91 additions & 7 deletions pkg/storage/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/storage/apply"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -64,9 +65,15 @@ func (c *checkedCmd) AckSuccess() error {
}
func (c *appliedCmd) FinishAndAckOutcome() error {
c.finished = true
c.acked = true
if logging {
fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected())
if c.acked {
if logging {
fmt.Printf(" finishing command %d; rejected=%t\n", c.Index(), c.Rejected())
}
} else {
if logging {
fmt.Printf(" finishing and acknowledging command %d; rejected=%t\n", c.Index(), c.Rejected())
}
c.acked = true
}
return nil
}
Expand Down Expand Up @@ -114,12 +121,12 @@ func getTestStateMachine() *testStateMachine {
return new(testStateMachine)
}

func (sm *testStateMachine) NewBatch() apply.Batch {
func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch {
if sm.batchOpen {
panic("batch not closed")
}
sm.batchOpen = true
return &testBatch{sm: sm}
return &testBatch{sm: sm, ephemeral: ephemeral}
}
func (sm *testStateMachine) ApplySideEffects(
cmdI apply.CheckedCommand,
Expand All @@ -134,8 +141,9 @@ func (sm *testStateMachine) ApplySideEffects(
}

type testBatch struct {
sm *testStateMachine
staged []uint64
sm *testStateMachine
ephemeral bool
staged []uint64
}

func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
Expand All @@ -145,6 +153,9 @@ func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
return &ccmd, nil
}
func (b *testBatch) Commit(_ context.Context) error {
if b.ephemeral {
return errors.New("can't commit an ephemeral batch")
}
b.sm.batches = append(b.sm.batches, b.staged)
b.sm.applied = append(b.sm.applied, b.staged...)
if logging {
Expand Down Expand Up @@ -263,3 +274,76 @@ func TestApplyCommittedEntriesWithBatchSize(t *testing.T) {
require.True(t, cmd.finished)
}
}

func TestAckCommittedEntriesBeforeApplication(t *testing.T) {
ctx := context.Background()
ents := makeEntries(9)

sm := getTestStateMachine()
dec := newTestDecoder()
dec.nonTrivial[6] = true
dec.nonTrivial[7] = true
dec.nonTrivial[9] = true
dec.nonLocal[2] = true
dec.shouldReject[3] = true

// Use an apply.Task to ack all commands before applying them.
appT := apply.MakeTask(sm, dec)
defer appT.Close()
require.NoError(t, appT.Decode(ctx, ents))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 10 /* maxIndex */))

// Assert that the state machine was not updated.
require.Equal(t, testStateMachine{}, *sm)

// Assert that some commands were acknowledged early and that none were finished.
for _, cmd := range dec.cmds {
var exp bool
switch cmd.index {
case 1, 4, 5:
exp = true // local and successful
case 2:
exp = false // remote
case 3:
exp = false // local and rejected
default:
exp = false // after first non-trivial cmd
}
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}

// Try again with a lower maximum log index.
appT.Close()
ents = makeEntries(5)

dec = newTestDecoder()
dec.nonLocal[2] = true
dec.shouldReject[3] = true

appT = apply.MakeTask(sm, dec)
require.NoError(t, appT.Decode(ctx, ents))
require.NoError(t, appT.AckCommittedEntriesBeforeApplication(ctx, 4 /* maxIndex */))

// Assert that the state machine was not updated.
require.Equal(t, testStateMachine{}, *sm)

// Assert that some commands were acknowledged early and that none were finished.
for _, cmd := range dec.cmds {
var exp bool
switch cmd.index {
case 1, 4:
exp = true // local and successful
case 2:
exp = false // remote
case 3:
exp = false // local and rejected
case 5:
exp = false // index too high
default:
t.Fatalf("unexpected index %d", cmd.index)
}
require.Equal(t, exp, cmd.acked)
require.False(t, cmd.finished)
}
}
Loading

0 comments on commit 5937762

Please sign in to comment.