Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: use RaftAppliedIndexTerm to generate SnapshotMetadata, don't scan log #88596

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -925,22 +924,11 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
func (b *replicaAppBatch) stageTrivialReplicatedEvalResult(
ctx context.Context, cmd *replicatedCmd,
) {
raftAppliedIndex := cmd.ent.Index
if raftAppliedIndex == 0 {
if cmd.ent.Index == 0 {
log.Fatalf(ctx, "raft entry with index 0")
}
b.state.RaftAppliedIndex = raftAppliedIndex
rs := cmd.decodedRaftEntry.replicatedResult().State
// We are post migration or this replicatedCmd is doing the migration.
if b.state.RaftAppliedIndexTerm > 0 || (rs != nil &&
rs.RaftAppliedIndexTerm == stateloader.RaftLogTermSignalForAddRaftAppliedIndexTermMigration) {
// Once we populate b.state.RaftAppliedIndexTerm it will flow into the
// persisted RangeAppliedState and into the in-memory representation in
// Replica.mu.state. The latter is used to initialize b.state, so future
// calls to this method will see that the migration has already happened
// and will continue to populate the term.
b.state.RaftAppliedIndexTerm = cmd.ent.Term
}
b.state.RaftAppliedIndex = cmd.ent.Index
b.state.RaftAppliedIndexTerm = cmd.ent.Term

if leaseAppliedIndex := cmd.leaseIndex; leaseAppliedIndex != 0 {
b.state.LeaseAppliedIndex = leaseAppliedIndex
Expand Down Expand Up @@ -1001,8 +989,6 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error {
r := b.r
r.mu.Lock()
r.mu.state.RaftAppliedIndex = b.state.RaftAppliedIndex
// RaftAppliedIndexTerm will be non-zero only when the
// AddRaftAppliedIndexTermMigration has happened.
r.mu.state.RaftAppliedIndexTerm = b.state.RaftAppliedIndexTerm
r.mu.state.LeaseAppliedIndex = b.state.LeaseAppliedIndex

Expand Down
26 changes: 4 additions & 22 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,10 @@ func (r *Replica) GetLeaseAppliedIndex() uint64 {
// Snapshot method.
func (r *replicaRaftStorage) Snapshot() (raftpb.Snapshot, error) {
r.mu.AssertHeld()
appliedIndex := r.mu.state.RaftAppliedIndex
term, err := r.Term(appliedIndex)
if err != nil {
return raftpb.Snapshot{}, err
}
return raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: appliedIndex,
Term: term,
Index: r.mu.state.RaftAppliedIndex,
Term: r.mu.state.RaftAppliedIndexTerm,
},
}, nil
}
Expand Down Expand Up @@ -605,17 +600,6 @@ func snapshot(
return OutgoingSnapshot{}, err
}

term, err := term(ctx, rsl, snap, rangeID, eCache, state.RaftAppliedIndex)
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && term != state.RaftAppliedIndexTerm {
return OutgoingSnapshot{},
errors.AssertionFailedf("unequal terms %d != %d", term, state.RaftAppliedIndexTerm)
}
if err != nil {
return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex)
}

return OutgoingSnapshot{
RaftEntryCache: eCache,
WithSideloaded: withSideloaded,
Expand All @@ -626,7 +610,7 @@ func snapshot(
Data: snapUUID.GetBytes(),
Metadata: raftpb.SnapshotMetadata{
Index: state.RaftAppliedIndex,
Term: term,
Term: state.RaftAppliedIndexTerm,
// Synthesize our raftpb.ConfState from desc.
ConfState: desc.Replicas().ConfState(),
},
Expand Down Expand Up @@ -979,9 +963,7 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}
// If we've migrated to populating RaftAppliedIndexTerm, check that the term
// from the two sources are equal.
if state.RaftAppliedIndexTerm != 0 && state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
if state.RaftAppliedIndexTerm != nonemptySnap.Metadata.Term {
log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d",
state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term)
}
Expand Down