From d3d262b73e9fa43f49edcb9d183f993320bb4b8a Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 9 Jan 2023 16:58:43 +0100 Subject: [PATCH] kvserver: fix bug in ephemeral app batch A bug was introduced recently[^1] which could lead to commands being early-acked despite ultimately being rejected during command application. The reason for this was that we previously updated the lease applied index of the ephemeral batch and didn't any more, meaning that commands that would fail the MaxLeaseIndex check on the actual apply batch could erroneously pass it on the ephemeral batch. Actually, I believe that change only made an existing bug (or at least semantic incorrectness) more common: it was removing that update altogether, but the old code blindly updated the MaxLeaseIndex, which also doesn't seem correct as not all commands (in particular leases) specify it in the first place. So conceivably, a command that would be rejected could have reset the MaxLeaseIndex and then allowed a later command that should also have been rejected to pass. I think this was mostly theoretical in nature since we only batch "trivial" commands but this PR improves the update to only occur for non-rejected commands. A regression test is added. Thanks to Nathan for bisecting and diagnosis. No release note because unreleased. Fixes #94409. [^1]: https://github.com/cockroachdb/cockroach/commit/e61de155017a1821d08ba0a414967927adabe0eb#diff-0fd06523f1f485024aef0c7a11d3472945d5ac7cf228d6007b2475ccf6f44cd6L795 Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/replica_app_batch.go | 6 ++ .../replica_application_state_machine_test.go | 83 +++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 383fa0936c3f..45632c42e202 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -513,6 +513,9 @@ func (b *replicaAppBatch) stageTrivialReplicatedEvalResult( b.state.RaftAppliedIndex = cmd.Index() b.state.RaftAppliedIndexTerm = cmd.Term + // NB: since the command is "trivial" we know the LeaseIndex field is set to + // something meaningful if it's nonzero (e.g. cmd is not a lease request). For + // a rejected command, cmd.LeaseIndex was zeroed out earlier. if leaseAppliedIndex := cmd.LeaseIndex; leaseAppliedIndex != 0 { b.state.LeaseAppliedIndex = leaseAppliedIndex } @@ -765,6 +768,9 @@ func (mb *ephemeralReplicaAppBatch) Stage( ) fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr) cmd.ForcedErrResult = fr + if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex { + mb.state.LeaseAppliedIndex = cmd.LeaseIndex + } return cmd, nil } diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index c0f78dd7210c..e420e2087e2a 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -14,10 +14,12 @@ import ( "context" "testing" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -395,3 +397,84 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) { }) }) } + +// TestReplicaStateMachineEphemeralAppBatchRejection is a regression test for +// #94409. It verifies that if two commands are in an ephemeral batch but the +// first command's MaxLeaseIndex prevents the second command from succeeding, we +// don't accidentally ack the second command. +func TestReplicaStateMachineEphemeralAppBatchRejection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := testContext{} + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + + // Lock the replica for the entire test. + r := tc.repl + r.raftMu.Lock() + defer r.raftMu.Unlock() + // Avoid additional raft processing after we're done with this replica because + // we've applied entries that aren't in the log. + defer r.mu.destroyStatus.Set(errors.New("boom"), destroyReasonRemoved) + + sm := r.getStateMachine() + + r.mu.Lock() + raftAppliedIndex := r.mu.state.RaftAppliedIndex + r.mu.Unlock() + + descWriteRepr := func(v string) (roachpb.Request, []byte) { + b := tc.store.Engine().NewBatch() + defer b.Close() + key := keys.LocalMax + val := roachpb.MakeValueFromString("hello") + require.NoError(t, b.PutMVCC(storage.MVCCKey{ + Timestamp: tc.Clock().Now(), + Key: key, + }, storage.MVCCValue{ + Value: val, + })) + return roachpb.NewPut(key, val), b.Repr() + } + + // Make two commands that have the same MaxLeaseIndex. They'll go + // into the same ephemeral batch and we expect that batch to accept + // the first command and reject the second. + var cmds []*replicatedCmd + for _, s := range []string{"earlier", "later"} { + req, repr := descWriteRepr(s) + ent := &raftlog.Entry{ + Entry: raftpb.Entry{ + Index: raftAppliedIndex + 1, + Type: raftpb.EntryNormal, + }, + ID: makeIDKey(), + Cmd: kvserverpb.RaftCommand{ + ProposerLeaseSequence: r.mu.state.Lease.Sequence, + MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1, + WriteBatch: &kvserverpb.WriteBatch{Data: repr}, + }, + } + var ba roachpb.BatchRequest + ba.Add(req) + cmd := &replicatedCmd{ + ctx: ctx, + ReplicatedCmd: raftlog.ReplicatedCmd{Entry: ent}, + proposal: &ProposalData{Request: &ba}, + } + require.True(t, cmd.CanAckBeforeApplication()) + cmds = append(cmds, cmd) + } + + var rejs []bool + b := sm.NewEphemeralBatch() + for _, cmd := range cmds { + checkedCmd, err := b.Stage(cmd.ctx, cmd) + require.NoError(t, err) + rejs = append(rejs, checkedCmd.Rejected()) + } + b.Close() + require.Equal(t, []bool{false, true}, rejs) +}