Skip to content

Commit

Permalink
kvserver: fix bug in ephemeral app batch
Browse files Browse the repository at this point in the history
A bug was introduced recently[^1] which could lead to commands being
early-acked despite ultimately being rejected during command
application. The reason for this was that we previously updated
the lease applied index of the ephemeral batch and didn't any more,
meaning that commands that would fail the MaxLeaseIndex check on the
actual apply batch could erroneously pass it on the ephemeral batch.

Actually, I believe that change only made an existing bug (or at least
semantic incorrectness) more common: it was removing that update
altogether, but the old code blindly updated the MaxLeaseIndex, which
also doesn't seem correct as not all commands (in particular leases)
specify it in the first place. So conceivably, a command that would
be rejected could have reset the MaxLeaseIndex and then allowed a
later command that should also have been rejected to pass.

I think this was mostly theoretical in nature since we only batch
"trivial" commands but this PR improves the update to only occur
for non-rejected commands.

A regression test is added.

Thanks to Nathan for bisecting and diagnosis.

No release note because unreleased.

Fixes cockroachdb#94409.

[^1]: cockroachdb@e61de15#diff-0fd06523f1f485024aef0c7a11d3472945d5ac7cf228d6007b2475ccf6f44cd6L795

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Jan 9, 2023
1 parent 6c619e2 commit 8b1c02f
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_app_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ func (mb *ephemeralReplicaAppBatch) Stage(
)
fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr)
cmd.ForcedErrResult = fr
if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex {
mb.state.LeaseAppliedIndex = cmd.LeaseIndex
}

return cmd, nil
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/kv/kvserver/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -395,3 +397,84 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) {
})
})
}

// TestReplicaStateMachineEphemeralAppBatchRejection is a regression test for
// #94409. It verifies that if two commands are in an ephemeral batch but the
// first command's MaxLeaseIndex prevents the second command from succeeding, we
// don't accidentally ack the second command.
func TestReplicaStateMachineEphemeralAppBatchRejection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
tc := testContext{}
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(ctx, t, stopper)

// Lock the replica for the entire test.
r := tc.repl
r.raftMu.Lock()
defer r.raftMu.Unlock()
// Avoid additional raft processing after we're done with this replica because
// we've applied entries that aren't in the log.
defer r.mu.destroyStatus.Set(errors.New("boom"), destroyReasonRemoved)

sm := r.getStateMachine()

r.mu.Lock()
raftAppliedIndex := r.mu.state.RaftAppliedIndex
r.mu.Unlock()

descWriteRepr := func(v string) (roachpb.Request, []byte) {
b := tc.store.Engine().NewBatch()
defer b.Close()
key := keys.LocalMax
val := roachpb.MakeValueFromString("hello")
require.NoError(t, b.PutMVCC(storage.MVCCKey{
Timestamp: tc.Clock().Now(),
Key: key,
}, storage.MVCCValue{
Value: val,
}))
return roachpb.NewPut(key, val), b.Repr()
}

// Make two commands that have the same MaxLeaseIndex. They'll go
// into the same ephemeral batch and we expect that batch to accept
// the first command and reject the second.
var cmds []*replicatedCmd
for _, s := range []string{"earlier", "later"} {
req, repr := descWriteRepr(s)
ent := &raftlog.Entry{
Entry: raftpb.Entry{
Index: raftAppliedIndex + 1,
Type: raftpb.EntryNormal,
},
ID: makeIDKey(),
Cmd: kvserverpb.RaftCommand{
ProposerLeaseSequence: r.mu.state.Lease.Sequence,
MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1,
WriteBatch: &kvserverpb.WriteBatch{Data: repr},
},
}
var ba roachpb.BatchRequest
ba.Add(req)
cmd := &replicatedCmd{
ctx: ctx,
ReplicatedCmd: raftlog.ReplicatedCmd{Entry: ent},
proposal: &ProposalData{Request: &ba},
}
require.True(t, cmd.CanAckBeforeApplication())
cmds = append(cmds, cmd)
}

var rejs []bool
b := sm.NewEphemeralBatch()
for _, cmd := range cmds {
checkedCmd, err := b.Stage(cmd.ctx, cmd)
require.NoError(t, err)
rejs = append(rejs, checkedCmd.Rejected())
}
b.Close()
require.Equal(t, []bool{false, true}, rejs)
}

0 comments on commit 8b1c02f

Please sign in to comment.