From 8b1c02f8de5599ae3fa079efa892b78e53f9b8f4 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Mon, 9 Jan 2023 16:58:43 +0100 Subject: [PATCH] kvserver: fix bug in ephemeral app batch A bug was introduced recently[^1] which could lead to commands being early-acked despite ultimately being rejected during command application. The reason for this was that we previously updated the lease applied index of the ephemeral batch and didn't any more, meaning that commands that would fail the MaxLeaseIndex check on the actual apply batch could erroneously pass it on the ephemeral batch. Actually, I believe that change only made an existing bug (or at least semantic incorrectness) more common: it was removing that update altogether, but the old code blindly updated the MaxLeaseIndex, which also doesn't seem correct as not all commands (in particular leases) specify it in the first place. So conceivably, a command that would be rejected could have reset the MaxLeaseIndex and then allowed a later command that should also have been rejected to pass. I think this was mostly theoretical in nature since we only batch "trivial" commands but this PR improves the update to only occur for non-rejected commands. A regression test is added. Thanks to Nathan for bisecting and diagnosis. No release note because unreleased. Fixes #94409. [^1]: https://github.com/cockroachdb/cockroach/commit/e61de155017a1821d08ba0a414967927adabe0eb#diff-0fd06523f1f485024aef0c7a11d3472945d5ac7cf228d6007b2475ccf6f44cd6L795 Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/replica_app_batch.go | 3 + .../replica_application_state_machine_test.go | 83 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 383fa0936c3f..9ef7f5d26934 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -765,6 +765,9 @@ func (mb *ephemeralReplicaAppBatch) Stage( ) fr = replicaApplyTestingFilters(ctx, mb.r, cmd, fr) cmd.ForcedErrResult = fr + if !cmd.Rejected() && cmd.LeaseIndex > mb.state.LeaseAppliedIndex { + mb.state.LeaseAppliedIndex = cmd.LeaseIndex + } return cmd, nil } diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index c0f78dd7210c..e420e2087e2a 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -14,10 +14,12 @@ import ( "context" "testing" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -395,3 +397,84 @@ func TestReplicaStateMachineRaftLogTruncationLooselyCoupled(t *testing.T) { }) }) } + +// TestReplicaStateMachineEphemeralAppBatchRejection is a regression test for +// #94409. It verifies that if two commands are in an ephemeral batch but the +// first command's MaxLeaseIndex prevents the second command from succeeding, we +// don't accidentally ack the second command. +func TestReplicaStateMachineEphemeralAppBatchRejection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + tc := testContext{} + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc.Start(ctx, t, stopper) + + // Lock the replica for the entire test. + r := tc.repl + r.raftMu.Lock() + defer r.raftMu.Unlock() + // Avoid additional raft processing after we're done with this replica because + // we've applied entries that aren't in the log. + defer r.mu.destroyStatus.Set(errors.New("boom"), destroyReasonRemoved) + + sm := r.getStateMachine() + + r.mu.Lock() + raftAppliedIndex := r.mu.state.RaftAppliedIndex + r.mu.Unlock() + + descWriteRepr := func(v string) (roachpb.Request, []byte) { + b := tc.store.Engine().NewBatch() + defer b.Close() + key := keys.LocalMax + val := roachpb.MakeValueFromString("hello") + require.NoError(t, b.PutMVCC(storage.MVCCKey{ + Timestamp: tc.Clock().Now(), + Key: key, + }, storage.MVCCValue{ + Value: val, + })) + return roachpb.NewPut(key, val), b.Repr() + } + + // Make two commands that have the same MaxLeaseIndex. They'll go + // into the same ephemeral batch and we expect that batch to accept + // the first command and reject the second. + var cmds []*replicatedCmd + for _, s := range []string{"earlier", "later"} { + req, repr := descWriteRepr(s) + ent := &raftlog.Entry{ + Entry: raftpb.Entry{ + Index: raftAppliedIndex + 1, + Type: raftpb.EntryNormal, + }, + ID: makeIDKey(), + Cmd: kvserverpb.RaftCommand{ + ProposerLeaseSequence: r.mu.state.Lease.Sequence, + MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1, + WriteBatch: &kvserverpb.WriteBatch{Data: repr}, + }, + } + var ba roachpb.BatchRequest + ba.Add(req) + cmd := &replicatedCmd{ + ctx: ctx, + ReplicatedCmd: raftlog.ReplicatedCmd{Entry: ent}, + proposal: &ProposalData{Request: &ba}, + } + require.True(t, cmd.CanAckBeforeApplication()) + cmds = append(cmds, cmd) + } + + var rejs []bool + b := sm.NewEphemeralBatch() + for _, cmd := range cmds { + checkedCmd, err := b.Stage(cmd.ctx, cmd) + require.NoError(t, err) + rejs = append(rejs, checkedCmd.Rejected()) + } + b.Close() + require.Equal(t, []bool{false, true}, rejs) +}