Skip to content

Commit

Permalink
kvserver/apply: use a better ctx for cmd.AckSuccess
Browse files Browse the repository at this point in the history
Before this patch, CheckedCommand.AckSuccess() was called with a Raft
worker context. That's wasn't great because each command captures a
better context to use - one that derives from the proposal's ctx in the
case of local proposals. This patch switches to using that by exposing
the captured context through the Command interface. Taking advantage of
the new ctx, we also log a message now about early acks, as it seems
like a notable hint to see in a trace.

This patch also cleans up most existing uses of that captured context to
use the new interface method; before, various code paths were type
asserting the implementation of the Command, and getting the internal
context that way. This patch moves the resposibility of deciding what
context to use upwards, to callers.

Release note: None
  • Loading branch information
andreimatei committed Nov 18, 2021
1 parent f16c6a2 commit d064059
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 24 deletions.
18 changes: 14 additions & 4 deletions pkg/kv/kvserver/apply/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ type Command interface {
// that were locally proposed typically have a client waiting on a
// response, so there is additional urgency to apply them quickly.
IsLocal() bool
// Ctx returns the Context in which operations on this Command should be
// performed.
//
// A Command does the unusual thing of capturing a Context because commands
// are generally processed in batches, but different commands might want their
// events going to different places. In particular, commands that have been
// proposed locally get a tracing span tied to the local proposal.
Ctx() context.Context
// AckErrAndFinish signals that the application of the command has been
// rejected due to the provided error. It also relays this rejection of
// the command to its client if it was proposed locally. An error will
Expand Down Expand Up @@ -167,12 +175,13 @@ func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIter
// responsible for converting Commands into CheckedCommand. The function
// closes the provided iterator.
func mapCmdIter(
iter CommandIterator, fn func(Command) (CheckedCommand, error),
iter CommandIterator, fn func(context.Context, Command) (CheckedCommand, error),
) (CheckedCommandIterator, error) {
defer iter.Close()
ret := iter.NewCheckedList()
for iter.Valid() {
checked, err := fn(iter.Cur())
cur := iter.Cur()
checked, err := fn(cur.Ctx(), cur)
if err != nil {
ret.Close()
return nil, err
Expand All @@ -188,12 +197,13 @@ func mapCmdIter(
// is responsible for converting CheckedCommand into AppliedCommand. The
// function closes the provided iterator.
func mapCheckedCmdIter(
iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error),
iter CheckedCommandIterator, fn func(context.Context, CheckedCommand) (AppliedCommand, error),
) (AppliedCommandIterator, error) {
defer iter.Close()
ret := iter.NewAppliedList()
for iter.Valid() {
applied, err := fn(iter.CurChecked())
curChecked := iter.CurChecked()
applied, err := fn(curChecked.Ctx(), curChecked)
if err != nil {
ret.Close()
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type StateMachine interface {
// an untimely crash. This means that applying these side-effects will
// typically update the in-memory representation of the state machine
// to the same state that it would be in if the process restarted.
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
ApplySideEffects(context.Context, CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the task
Expand All @@ -67,7 +67,7 @@ var ErrRemoved = errors.New("replica removed")
type Batch interface {
// Stage inserts a Command into the Batch. In doing so, the Command is
// checked for rejection and a CheckedCommand is returned.
Stage(Command) (CheckedCommand, error)
Stage(context.Context, Command) (CheckedCommand, error)
// ApplyToStateMachine applies the persistent state transitions staged
// in the Batch to the StateMachine, atomically.
ApplyToStateMachine(context.Context) error
Expand Down Expand Up @@ -225,7 +225,7 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
// want to retry the command instead of returning the error to the client.
return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error {
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
return cmd.AckSuccess(ctx)
return cmd.AckSuccess(cmd.Ctx())
}
return nil
})
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/apply/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type appliedCmd struct {
*checkedCmd
}

func (c *cmd) Index() uint64 { return c.index }
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
func (c *cmd) IsLocal() bool { return !c.nonLocal }
func (c *cmd) Index() uint64 { return c.index }
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
func (c *cmd) IsLocal() bool { return !c.nonLocal }
func (c *cmd) Ctx() context.Context { return context.Background() }
func (c *cmd) AckErrAndFinish(_ context.Context, err error) error {
c.acked = true
c.finished = true
Expand Down Expand Up @@ -138,7 +139,7 @@ func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch {
return &testBatch{sm: sm, ephemeral: ephemeral}
}
func (sm *testStateMachine) ApplySideEffects(
cmdI apply.CheckedCommand,
_ context.Context, cmdI apply.CheckedCommand,
) (apply.AppliedCommand, error) {
cmd := cmdI.(*checkedCmd)
sm.appliedSideEffects = append(sm.appliedSideEffects, cmd.index)
Expand All @@ -160,7 +161,7 @@ type testBatch struct {
staged []uint64
}

func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
func (b *testBatch) Stage(_ context.Context, cmdI apply.Command) (apply.CheckedCommand, error) {
cmd := cmdI.(*cmd)
b.staged = append(b.staged, cmd.index)
ccmd := checkedCmd{cmd: cmd, rejected: cmd.shouldReject}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (c *replicatedCmd) IsLocal() bool {
return c.proposal != nil
}

// Ctx implements the apply.Command interface.
func (c *replicatedCmd) Ctx() context.Context {
return c.ctx
}

// AckErrAndFinish implements the apply.Command interface.
func (c *replicatedCmd) AckErrAndFinish(ctx context.Context, err error) error {
if c.IsLocal() {
Expand Down Expand Up @@ -143,7 +148,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool {
}

// AckSuccess implements the apply.CheckedCommand interface.
func (c *replicatedCmd) AckSuccess(_ context.Context) error {
func (c *replicatedCmd) AckSuccess(ctx context.Context) error {
if !c.IsLocal() {
return nil
}
Expand All @@ -158,6 +163,7 @@ func (c *replicatedCmd) AckSuccess(_ context.Context) error {
resp.Reply = &reply
resp.EncounteredIntents = c.proposal.Local.DetachEncounteredIntents()
resp.EndTxns = c.proposal.Local.DetachEndTxns(false /* alwaysOnly */)
log.Event(ctx, "ack-ing replication success to the client; application will continue async w.r.t. the client")
c.proposal.signalProposalResult(resp)
return nil
}
Expand Down
21 changes: 12 additions & 9 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,10 @@ type replicaAppBatch struct {
// the batch. This allows the batch to make an accurate determination about
// whether to accept or reject the next command that is staged without needing
// to actually update the replica state machine in between.
func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
func (b *replicaAppBatch) Stage(
ctx context.Context, cmdI apply.Command,
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)
ctx := cmd.ctx
if cmd.ent.Index == 0 {
return nil, makeNonDeterministicFailure("processRaftCommand requires a non-zero index")
}
Expand All @@ -457,7 +458,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
cmd.raftCmd.LogicalOpLog = nil
cmd.raftCmd.ClosedTimestamp = nil
} else {
if err := b.assertNoCmdClosedTimestampRegression(cmd); err != nil {
if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil {
return nil, err
}
if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil {
Expand Down Expand Up @@ -992,7 +993,9 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd)

// Assert that the closed timestamp carried by the command is not below one from
// previous commands.
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCmd) error {
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(
ctx context.Context, cmd *replicatedCmd,
) error {
if !raftClosedTimestampAssertionsEnabled {
return nil
}
Expand All @@ -1012,7 +1015,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm
prevReq.SafeString("<unknown; not leaseholder or not lease request>")
}

logTail, err := b.r.printRaftTail(cmd.ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */)
logTail, err := b.r.printRaftTail(ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */)
if err != nil {
if logTail != "" {
logTail = logTail + "\n; error printing log: " + err.Error()
Expand Down Expand Up @@ -1043,9 +1046,10 @@ type ephemeralReplicaAppBatch struct {
}

// Stage implements the apply.Batch interface.
func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
func (mb *ephemeralReplicaAppBatch) Stage(
ctx context.Context, cmdI apply.Command,
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)
ctx := cmd.ctx

mb.r.shouldApplyCommand(ctx, cmd, &mb.state)
mb.state.LeaseAppliedIndex = cmd.leaseIndex
Expand All @@ -1071,10 +1075,9 @@ func (mb *ephemeralReplicaAppBatch) Close() {
// side effects of commands, such as finalizing splits/merges and informing
// raft about applied config changes.
func (sm *replicaStateMachine) ApplySideEffects(
cmdI apply.CheckedCommand,
ctx context.Context, cmdI apply.CheckedCommand,
) (apply.AppliedCommand, error) {
cmd := cmdI.(*replicatedCmd)
ctx := cmd.ctx

// Deal with locking during side-effect handling, which is sometimes
// associated with complex commands such as splits and merged.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
},
}

checkedCmd, err := b.Stage(cmd)
checkedCmd, err := b.Stage(cmd.ctx, cmd)
require.NoError(t, err)
require.Equal(t, !add, b.changeRemovesReplica)
require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index)
Expand All @@ -129,7 +129,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
require.NoError(t, err)

// Apply the side effects of the command to the StateMachine.
_, err = sm.ApplySideEffects(checkedCmd)
_, err = sm.ApplySideEffects(checkedCmd.Ctx(), checkedCmd)
if add {
require.NoError(t, err)
} else {
Expand Down

0 comments on commit d064059

Please sign in to comment.