Skip to content

Commit

Permalink
kvserver: decouple cmd checks in replicaAppBatch
Browse files Browse the repository at this point in the history
This refactors the command application pre-flight checks
on replicaAppBatch such that they can move to appBatch
once that struct evolves from a stub into an actual
implementation of `apply.Batch`.

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Dec 19, 2022
1 parent 369c405 commit e61de15
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 56 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
srcs = [
"addressing.go",
"allocation_op.go",
"app_batch.go",
"consistency_queue.go",
"debug_print.go",
"doc.go",
Expand Down
99 changes: 99 additions & 0 deletions pkg/kv/kvserver/app_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// appBatch is the in-progress foundation for standalone log entry
// application[^1], i.e. the act of applying raft log entries to the state
// machine in a library-style fashion, without a running CockroachDB server.
//
// The intended usage is as follows. Starting with a ReplicatedCmd per Entry,
//
// 1. check it via assertAndCheckCommand followed by toCheckedCmd
// 2. run pre-add triggers (which may augment the WriteBatch)
// 3. stage the WriteBatch into a pebble Batch
// 4. run post-add triggers (metrics, etc)
//
// when all Entries have been added, the batch can be committed. In the course
// of time, appBatch will become an implementation of apply.Batch itself; at the
// time of writing it is only used by the replicaAppBatch implementation of
// apply.Batch, which goes through the above steps while interspersing:
//
// 1a. testing interceptors between assertAndCheckCommand and toCheckedCmd
// 2b. pre-add triggers specific to online command application (e.g. acquiring locks
// during replica-spanning operations), and
// 4b. post-add triggers specific to online command application (e.g. updates to
// Replica in-mem state)
//
// [^1]: https://github.com/cockroachdb/cockroach/issues/75729
type appBatch struct {
// TODO(tbg): this will absorb the following fields from replicaAppBatch:
//
// - batch
// - state
// - changeRemovesReplica
}

func (b *appBatch) assertAndCheckCommand(
ctx context.Context, cmd *raftlog.ReplicatedCmd, state *kvserverpb.ReplicaState, isLocal bool,
) (leaseIndex uint64, _ kvserverbase.ProposalRejectionType, forcedErr *roachpb.Error, _ error) {
if log.V(4) {
log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp)
}

if cmd.Index() == 0 {
return 0, 0, nil, errors.AssertionFailedf("processRaftCommand requires a non-zero index")
}
if idx, applied := cmd.Index(), state.RaftAppliedIndex; idx != applied+1 {
// If we have an out-of-order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return.
return 0, 0, nil, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx)
}

// TODO(sep-raft-log): move the closedts checks from replicaAppBatch here as
// well. This just needs a bit more untangling as they reference *Replica, but
// for no super-convincing reason.

leaseIndex, rej, forcedErr := kvserverbase.CheckForcedErr(ctx, cmd.ID, &cmd.Cmd, isLocal, state)
return leaseIndex, rej, forcedErr, nil
}

func (b *appBatch) toCheckedCmd(
ctx context.Context,
cmd *raftlog.ReplicatedCmd,
leaseIndex uint64,
rej kvserverbase.ProposalRejectionType,
forcedErr *roachpb.Error,
) {
cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rej, forcedErr
if cmd.Rejected() {
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedErr)

// Apply an empty command.
cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
cmd.Cmd.WriteBatch = nil
cmd.Cmd.LogicalOpLog = nil
cmd.Cmd.ClosedTimestamp = nil
} else {
log.Event(ctx, "applying command")
}
}
64 changes: 31 additions & 33 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -94,39 +95,32 @@ func (b *replicaAppBatch) Stage(
ctx context.Context, cmdI apply.Command,
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)
if cmd.Index() == 0 {
return nil, errors.AssertionFailedf("processRaftCommand requires a non-zero index")

// We'll follow the steps outlined in appBatch's comment here, and will call
// into appBatch at appropriate times.
var ab appBatch

leaseIndex, rej, forcedErr, err := ab.assertAndCheckCommand(ctx, &cmd.ReplicatedCmd, &b.state, cmd.IsLocal())
if err != nil {
return nil, err
}
if idx, applied := cmd.Index(), b.state.RaftAppliedIndex; idx != applied+1 {
// If we have an out of order index, there's corruption. No sense in
// trying to update anything or running the command. Simply return.
return nil, errors.AssertionFailedf("applied index jumped from %d to %d", applied, idx)

// Then, maybe override the result with testing knobs.
if b.r.store.TestingKnobs() != nil {
rej, forcedErr = replicaApplyTestingFilters(ctx, b.r, cmd, rej, forcedErr)
}
if log.V(4) {
log.Infof(ctx, "processing command %x: raftIndex=%d maxLeaseIndex=%d closedts=%s",
cmd.ID, cmd.Index(), cmd.Cmd.MaxLeaseIndex, cmd.Cmd.ClosedTimestamp)
}

// Determine whether the command should be applied to the replicated state
// machine or whether it should be rejected (and replaced by an empty command).
// This check is deterministic on all replicas, so if one replica decides to
// reject a command, all will.
if !b.r.shouldApplyCommand(ctx, cmd, &b.state) {
log.VEventf(ctx, 1, "applying command with forced error: %s", cmd.ForcedErr)

// Apply an empty command.
cmd.Cmd.ReplicatedEvalResult = kvserverpb.ReplicatedEvalResult{}
cmd.Cmd.WriteBatch = nil
cmd.Cmd.LogicalOpLog = nil
cmd.Cmd.ClosedTimestamp = nil
} else {
if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil {
return nil, err
}
if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil {
return nil, err
}
log.Event(ctx, "applying command")

// Now update cmd. We'll either put the lease index in it or zero out
// the cmd in case there's a forced error.
ab.toCheckedCmd(ctx, &cmd.ReplicatedCmd, leaseIndex, rej, forcedErr)

// TODO(tbg): these assertions should be pushed into
// (*appBatch).assertAndCheckCommand.
if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil {
return nil, err
}
if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil {
return nil, err
}

// Acquire the split or merge lock, if necessary. If a split or merge
Expand Down Expand Up @@ -791,8 +785,12 @@ func (mb *ephemeralReplicaAppBatch) Stage(
) (apply.CheckedCommand, error) {
cmd := cmdI.(*replicatedCmd)

mb.r.shouldApplyCommand(ctx, cmd, &mb.state)
mb.state.LeaseAppliedIndex = cmd.LeaseIndex
leaseIndex, rejection, forcedErr := kvserverbase.CheckForcedErr(
ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), &mb.state,
)
rejection, forcedErr = replicaApplyTestingFilters(ctx, mb.r, cmd, rejection, forcedErr)
cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = leaseIndex, rejection, forcedErr

return cmd, nil
}

Expand Down
48 changes: 25 additions & 23 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,43 +79,45 @@ func (r *Replica) getStateMachine() *replicaStateMachine {
return sm
}

// shouldApplyCommand determines whether or not a command should be applied to
// the replicated state machine after it has been committed to the Raft log. It
// then sets the provided command's LeaseIndex, Rejection, and ForcedErr
// fields and returns whether command should be applied or rejected.
func (r *Replica) shouldApplyCommand(
ctx context.Context, cmd *replicatedCmd, replicaState *kvserverpb.ReplicaState,
) bool {
cmd.LeaseIndex, cmd.Rejection, cmd.ForcedErr = kvserverbase.CheckForcedErr(
ctx, cmd.ID, &cmd.Cmd, cmd.IsLocal(), replicaState,
)
// Consider testing-only filters.
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; cmd.ForcedErr != nil || filter != nil {
// TODO(tbg): move this to replica_app_batch.go.
func replicaApplyTestingFilters(
ctx context.Context,
r *Replica,
cmd *replicatedCmd,
rejection kvserverbase.ProposalRejectionType,
forcedErr *roachpb.Error,
) (newRejection kvserverbase.ProposalRejectionType, newForcedErr *roachpb.Error) {
// By default, output is input.
newRejection = rejection
newForcedErr = forcedErr

// Filters may change that.
if filter := r.store.cfg.TestingKnobs.TestingApplyCalledTwiceFilter; forcedErr != nil || filter != nil {
args := kvserverbase.ApplyFilterArgs{
CmdID: cmd.ID,
ReplicatedEvalResult: *cmd.ReplicatedResult(),
StoreID: r.store.StoreID(),
RangeID: r.RangeID,
ForcedError: cmd.ForcedErr,
ForcedError: forcedErr,
}
if cmd.ForcedErr == nil {
if forcedErr == nil {
if cmd.IsLocal() {
args.Req = cmd.proposal.Request
}
newPropRetry, newForcedErr := filter(args)
cmd.ForcedErr = newForcedErr
if cmd.Rejection == 0 {
cmd.Rejection = kvserverbase.ProposalRejectionType(newPropRetry)
var newRej int
newRej, newForcedErr = filter(args)
if rejection == 0 {
newRejection = kvserverbase.ProposalRejectionType(newRej)
}
} else if feFilter := r.store.cfg.TestingKnobs.TestingApplyForcedErrFilter; feFilter != nil {
newPropRetry, newForcedErr := filter(args)
cmd.ForcedErr = newForcedErr
if cmd.Rejection == 0 {
cmd.Rejection = kvserverbase.ProposalRejectionType(newPropRetry)
var newRej int
newRej, newForcedErr = filter(args)
if rejection == 0 {
newRejection = kvserverbase.ProposalRejectionType(newRej)
}
}
}
return cmd.ForcedErr == nil
return newRejection, newForcedErr
}

// NewEphemeralBatch implements the apply.StateMachine interface.
Expand Down

0 comments on commit e61de15

Please sign in to comment.